other:merge 3.0
This commit is contained in:
parent
698f536cd6
commit
a2a1da06dd
|
@ -27,6 +27,10 @@ else ()
|
||||||
cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
if(TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||||
|
cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||||
|
endif()
|
||||||
|
|
||||||
# pthread
|
# pthread
|
||||||
if(${BUILD_PTHREAD})
|
if(${BUILD_PTHREAD})
|
||||||
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||||
|
@ -392,6 +396,19 @@ if(${BUILD_WITH_SQLITE})
|
||||||
endif(NOT TD_WINDOWS)
|
endif(NOT TD_WINDOWS)
|
||||||
endif(${BUILD_WITH_SQLITE})
|
endif(${BUILD_WITH_SQLITE})
|
||||||
|
|
||||||
|
# jemalloc
|
||||||
|
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||||
|
include(ExternalProject)
|
||||||
|
ExternalProject_Add(jemalloc
|
||||||
|
PREFIX "jemalloc"
|
||||||
|
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
|
||||||
|
BUILD_IN_SOURCE 1
|
||||||
|
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
|
||||||
|
BUILD_COMMAND ${MAKE}
|
||||||
|
)
|
||||||
|
INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include)
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
# addr2line
|
# addr2line
|
||||||
if(${BUILD_ADDR2LINE})
|
if(${BUILD_ADDR2LINE})
|
||||||
if(NOT ${TD_WINDOWS})
|
if(NOT ${TD_WINDOWS})
|
||||||
|
|
|
@ -123,7 +123,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
|
||||||
* @param handle
|
* @param handle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds);
|
int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* kill the ongoing query and free the query handle and corresponding resources automatically
|
* kill the ongoing query and free the query handle and corresponding resources automatically
|
||||||
|
|
|
@ -604,28 +604,44 @@ _end:
|
||||||
|
|
||||||
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||||
int64_t suid, int8_t blkType) {
|
int64_t suid, int8_t blkType) {
|
||||||
while (1) {
|
SArray *pResList = taosArrayInit(1, POINTER_BYTES);
|
||||||
SSDataBlock *output = NULL;
|
if (pResList == NULL) {
|
||||||
uint64_t ts;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = qExecTask(taskInfo, &output, &ts);
|
while (1) {
|
||||||
|
uint64_t ts;
|
||||||
|
int32_t code = qExecTask(taskInfo, pResList, &ts);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
|
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
|
||||||
pItem->level, terrstr(code));
|
pItem->level, terrstr(code));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (output) {
|
if (taosArrayGetSize(pResList) == 0) {
|
||||||
#if 0
|
if (terrno == 0) {
|
||||||
|
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
|
||||||
|
} else {
|
||||||
|
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
|
||||||
|
SSDataBlock *output = taosArrayGetP(pResList, i);
|
||||||
|
|
||||||
|
#if 1
|
||||||
char flag[10] = {0};
|
char flag[10] = {0};
|
||||||
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
||||||
SArray *pResult = taosArrayInit(1, sizeof(SSDataBlock));
|
// blockDebugShowDataBlocks(output, flag);
|
||||||
taosArrayPush(pResult, output);
|
// taosArrayDestroy(pResult);
|
||||||
blockDebugShowDataBlocks(pResult, flag);
|
|
||||||
taosArrayDestroy(pResult);
|
|
||||||
#endif
|
#endif
|
||||||
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
STsdb * sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
||||||
SSubmitReq *pReq = NULL;
|
SSubmitReq *pReq = NULL;
|
||||||
|
|
||||||
// TODO: the schema update should be handled later(TD-17965)
|
// TODO: the schema update should be handled later(TD-17965)
|
||||||
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
|
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
|
||||||
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
|
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||||
|
@ -644,17 +660,14 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
|
||||||
SMA_VID(pSma), suid, pItem->level, output->info.version);
|
SMA_VID(pSma), suid, pItem->level, output->info.version);
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
} else if (terrno == 0) {
|
|
||||||
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pResList);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
taosArrayDestroy(pResList);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1407,7 +1420,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
|
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
|
||||||
((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead);
|
((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead);
|
||||||
|
|
||||||
|
@ -1434,10 +1447,10 @@ _err:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief fetch rsma data of level 2/3 and submit
|
* @brief fetch rsma data of level 2/3 and submit
|
||||||
*
|
*
|
||||||
* @param pSma
|
* @param pSma
|
||||||
* @param pMsg
|
* @param pMsg
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
|
int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
|
||||||
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
||||||
|
|
|
@ -80,14 +80,22 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowCnt = 0;
|
int32_t rowCnt = 0;
|
||||||
|
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
taosArrayClear(pResList);
|
||||||
|
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
tqDebug("task start to execute");
|
tqDebug("task start to execute");
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, pResList, &ts) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
tqDebug("task execute end, get %p", pDataBlock);
|
|
||||||
|
if (taosArrayGetSize(pResList) > 0) {
|
||||||
|
pDataBlock = taosArrayGet(pResList, 0);
|
||||||
|
tqDebug("task execute end, get %p", pDataBlock);
|
||||||
|
}
|
||||||
|
|
||||||
if (pDataBlock != NULL) {
|
if (pDataBlock != NULL) {
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
|
@ -143,6 +151,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pResList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,10 +69,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
// data format:
|
// data format:
|
||||||
// +----------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
|
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
|
||||||
// |SDataCacheEntry | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
|
// |SDataCacheEntry | version | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
|
||||||
// | |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
|
// | | sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
|
||||||
// +----------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
|
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
|
||||||
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
||||||
// recorded in the first segment, next to the struct header
|
// recorded in the first segment, next to the struct header
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
|
@ -422,11 +422,17 @@ int waitMoment(SQInfo* pQInfo) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
static void freeBlock(void* param) {
|
||||||
|
SSDataBlock* pBlock = *(SSDataBlock**) param;
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
int64_t threadId = taosGetSelfPthreadId();
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
|
||||||
*pRes = NULL;
|
taosArrayClearEx(pResList, freeBlock);
|
||||||
|
|
||||||
int64_t curOwner = 0;
|
int64_t curOwner = 0;
|
||||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
|
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
|
||||||
|
@ -457,23 +463,34 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
|
|
||||||
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
|
int32_t current = 0;
|
||||||
|
SSDataBlock* pRes = NULL;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
|
while((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
|
||||||
|
SSDataBlock* p = createOneDataBlock(pRes, true);
|
||||||
|
current += p->info.rows;
|
||||||
|
ASSERT(p->info.rows > 0);
|
||||||
|
taosArrayPush(pResList, &p);
|
||||||
|
|
||||||
|
if (current >= 4096) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t el = (taosGetTimestampUs() - st);
|
uint64_t el = (taosGetTimestampUs() - st);
|
||||||
|
|
||||||
pTaskInfo->cost.elapsedTime += el;
|
pTaskInfo->cost.elapsedTime += el;
|
||||||
if (NULL == *pRes) {
|
if (NULL == pRes) {
|
||||||
*useconds = pTaskInfo->cost.elapsedTime;
|
*useconds = pTaskInfo->cost.elapsedTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanUpUdfs();
|
cleanUpUdfs();
|
||||||
|
|
||||||
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
|
||||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||||
|
|
||||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||||
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
|
GET_TASKID(pTaskInfo), current, (int32_t) taosArrayGetSize(pResList), total, 0, el / 1000.0);
|
||||||
|
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
return pTaskInfo->code;
|
return pTaskInfo->code;
|
||||||
|
|
|
@ -75,22 +75,20 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool qcontinue = true;
|
bool qcontinue = true;
|
||||||
SSDataBlock *pRes = NULL;
|
|
||||||
uint64_t useconds = 0;
|
uint64_t useconds = 0;
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
int32_t execNum = 0;
|
int32_t execNum = 0;
|
||||||
qTaskInfo_t taskHandle = ctx->taskHandle;
|
qTaskInfo_t taskHandle = ctx->taskHandle;
|
||||||
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
||||||
|
|
||||||
|
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
|
||||||
while (true) {
|
while (true) {
|
||||||
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
|
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
|
||||||
|
|
||||||
pRes = NULL;
|
|
||||||
|
|
||||||
// if *taskHandle is NULL, it's killed right now
|
// if *taskHandle is NULL, it's killed right now
|
||||||
if (taskHandle) {
|
if (taskHandle) {
|
||||||
qwDbgSimulateSleep();
|
qwDbgSimulateSleep();
|
||||||
code = qExecTask(taskHandle, &pRes, &useconds);
|
code = qExecTask(taskHandle, pResList, &useconds);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
|
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
|
||||||
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
|
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||||
|
@ -103,9 +101,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
|
|
||||||
++execNum;
|
++execNum;
|
||||||
|
|
||||||
if (NULL == pRes) {
|
if (taosArrayGetSize(pResList) == 0) {
|
||||||
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
|
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
|
||||||
|
|
||||||
dsEndPut(sinkHandle, useconds);
|
dsEndPut(sinkHandle, useconds);
|
||||||
|
|
||||||
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
|
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
|
||||||
|
@ -117,19 +114,20 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rows = pRes->info.rows;
|
for(int32_t j = 0; j < taosArrayGetSize(pResList); ++j) {
|
||||||
|
SSDataBlock *pRes = taosArrayGetP(pResList, j);
|
||||||
|
ASSERT(pRes->info.rows > 0);
|
||||||
|
|
||||||
ASSERT(pRes->info.rows > 0);
|
SInputData inputData = {.pData = pRes};
|
||||||
|
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
|
||||||
|
if (code) {
|
||||||
|
QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
|
||||||
|
QW_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
SInputData inputData = {.pData = pRes};
|
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
|
||||||
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
|
|
||||||
if (code) {
|
|
||||||
QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
|
|
||||||
QW_ERR_RET(code);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
|
|
||||||
|
|
||||||
if (!qcontinue) {
|
if (!qcontinue) {
|
||||||
if (queryStop) {
|
if (queryStop) {
|
||||||
*queryStop = true;
|
*queryStop = true;
|
||||||
|
@ -151,6 +149,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pResList);
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,13 +43,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
}
|
}
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
|
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* output = NULL;
|
SSDataBlock* output = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
if (qExecTask(exec, &output, &ts) < 0) {
|
|
||||||
|
taosArrayClear(pResList);
|
||||||
|
if (qExecTask(exec, pResList, &ts) < 0) {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
if (output == NULL) {
|
|
||||||
|
if (taosArrayGetSize(pResList) == 0) {
|
||||||
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
|
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
|
||||||
|
@ -65,6 +69,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
output = taosArrayGetP(pResList, 0);
|
||||||
if (output->info.type == STREAM_RETRIEVE) {
|
if (output->info.type == STREAM_RETRIEVE) {
|
||||||
if (streamBroadcastToChildren(pTask, output) < 0) {
|
if (streamBroadcastToChildren(pTask, output) < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
|
@ -79,6 +84,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
block.info.childId = pTask->selfChildId;
|
block.info.childId = pTask->selfChildId;
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pResList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,6 +105,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
||||||
|
|
||||||
void* exec = pTask->exec.executor;
|
void* exec = pTask->exec.executor;
|
||||||
|
|
||||||
|
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
|
||||||
while (1) {
|
while (1) {
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
if (pRes == NULL) {
|
if (pRes == NULL) {
|
||||||
|
@ -107,14 +115,17 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
||||||
|
|
||||||
int32_t batchCnt = 0;
|
int32_t batchCnt = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* output = NULL;
|
uint64_t ts = 0;
|
||||||
uint64_t ts = 0;
|
taosArrayClear(pResList);
|
||||||
if (qExecTask(exec, &output, &ts) < 0) {
|
if (qExecTask(exec, pResList, &ts) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
if (output == NULL) break;
|
|
||||||
|
if (taosArrayGetSize(pResList) == 0) break;
|
||||||
|
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
|
SSDataBlock* output = taosArrayGetP(pResList, 0);
|
||||||
|
|
||||||
assignOneDataBlock(&block, output);
|
assignOneDataBlock(&block, output);
|
||||||
block.info.childId = pTask->selfChildId;
|
block.info.childId = pTask->selfChildId;
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
|
|
Loading…
Reference in New Issue