diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h
index 88f308710e..42cc637a6c 100644
--- a/source/libs/executor/inc/executorInt.h
+++ b/source/libs/executor/inc/executorInt.h
@@ -20,6 +20,8 @@
extern "C" {
#endif
+extern int32_t exchangeObjRefPool;
+
typedef struct {
char* pData;
bool isNull;
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 19467d35d8..c772794088 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -181,7 +181,6 @@ typedef struct SExecTaskInfo {
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
-// uint64_t totalRows; // total number of rows
struct {
char *tablename;
char *dbname;
@@ -222,10 +221,10 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv;
enum {
- OP_NOT_OPENED = 0x0,
- OP_OPENED = 0x1,
+ OP_NOT_OPENED = 0x0,
+ OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
- OP_EXEC_DONE = 0x9,
+ OP_EXEC_DONE = 0x9,
};
typedef struct SOperatorFpSet {
@@ -262,13 +261,12 @@ typedef enum {
} EX_SOURCE_STATUS;
typedef struct SSourceDataInfo {
- struct SExchangeInfo* pEx;
int32_t index;
SRetrieveTableRsp* pRsp;
uint64_t totalRows;
int32_t code;
EX_SOURCE_STATUS status;
- const char* id;
+ const char* taskId;
} SSourceDataInfo;
typedef struct SLoadRemoteDataInfo {
@@ -286,6 +284,7 @@ typedef struct SExchangeInfo {
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SLoadRemoteDataInfo loadInfo;
+ uint64_t self;
} SExchangeInfo;
#define COL_MATCH_FROM_COL_ID 0x1
@@ -736,6 +735,8 @@ typedef struct SJoinOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
+void doDestroyExchangeOperatorInfo(void* param);
+
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c
index c014b23953..b1125c8e80 100644
--- a/source/libs/executor/src/executorMain.c
+++ b/source/libs/executor/src/executorMain.c
@@ -13,28 +13,30 @@
* along with this program. If not, see .
*/
-#include
-#include "dataSinkMgt.h"
-#include "texception.h"
#include "os.h"
-#include "tarray.h"
-#include "tcache.h"
-#include "tglobal.h"
+#include "tref.h"
+#include "dataSinkMgt.h"
#include "tmsg.h"
#include "tudf.h"
#include "executor.h"
#include "executorimpl.h"
#include "query.h"
-#include "thash.h"
-#include "tlosertree.h"
-#include "ttypes.h"
+
+static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
+int32_t exchangeObjRefPool = -1;
+
+static void initRefPool() {
+ exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
+}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
- qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
+ qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
+ taosThreadOnce(&initPoolOnce, initRefPool);
+
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 5442cce625..be877366ef 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "tref.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@@ -37,9 +38,6 @@
#include "vnode.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
-#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
-#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN)
-#define SET_MAIN_SCAN_FLAG(runtime) ((runtime)->scanFlag = MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define SDATA_BLOCK_INITIALIZER \
@@ -47,12 +45,6 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
-enum {
- TS_JOIN_TS_EQUAL = 0,
- TS_JOIN_TS_NOT_EQUALS = 1,
- TS_JOIN_TAG_NOT_EQUALS = 2,
-};
-
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand();
@@ -88,7 +80,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
-//#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
@@ -110,7 +101,6 @@ static void releaseQueryBuf(size_t numOfTables);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
-static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
@@ -2399,28 +2389,22 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
#endif
}
-// static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBlock, SHashObj* pTableIdInfo, int32_t
-// order) {
-// int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
-// pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step;
-//
-// if (pTableQueryInfo->pTable == NULL) {
-// return;
-// }
-//
-// STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo);
-// STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
-// if (idinfo != NULL) {
-// assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid);
-// idinfo->key = tidInfo.key;
-// } else {
-// taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
-// }
-// }
+typedef struct SFetchRspHandleWrapper {
+ uint32_t exchangeId;
+ int32_t sourceIndex;
+} SFetchRspHandleWrapper;
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
- SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param;
- int32_t index = pSourceDataInfo->index;
+ SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*) param;
+
+ SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
+ if (pExchangeInfo == NULL) {
+ qWarn("failed to acquire exchange operator, since it may have been released");
+ return TSDB_CODE_SUCCESS;
+ }
+
+ int32_t index = pWrapper->sourceIndex;
+ SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (code == TSDB_CODE_SUCCESS) {
pSourceDataInfo->pRsp = pMsg->pData;
@@ -2432,13 +2416,17 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pRsp != NULL);
- qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->id, index, pRsp->numOfRows);
+ qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
} else {
pSourceDataInfo->code = code;
}
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
- tsem_post(&pSourceDataInfo->pEx->ready);
+
+ tsem_post(&pExchangeInfo->ready);
+ taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
+
+ taosMemoryFree(pWrapper);
return TSDB_CODE_SUCCESS;
}
@@ -2487,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
- pMsg->sId = htobe64(pSource->schedId);
- pMsg->taskId = htobe64(pSource->taskId);
- pMsg->queryId = htobe64(pTaskInfo->id.queryId);
+ pMsg->sId = htobe64(pSource->schedId);
+ pMsg->taskId = htobe64(pSource->taskId);
+ pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
@@ -2500,11 +2488,15 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
return pTaskInfo->code;
}
- pMsgSendInfo->param = pDataInfo;
+ SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
+ pWrapper->exchangeId = pExchangeInfo->self;
+ pWrapper->sourceIndex = sourceIndex;
+
+ pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
- pMsgSendInfo->fp = loadRemoteDataCallback;
+ pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
@@ -2630,9 +2622,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
while (1) {
int32_t completed = 0;
- qDebug("%s current ready sources:%ld", GET_TASKID(pTaskInfo), pExchangeInfo->ready.__align);
- tsem_wait(&pExchangeInfo->ready);
-
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
@@ -2698,30 +2687,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
}
- int32_t comp = 0;
- for(int32_t j = 0; j < totalSources; ++j) {
- SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j);
- if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) {
- comp += 1;
- }
- }
-
- if (comp == totalSources) {
- setAllSourcesCompleted(pOperator, startTs);
- }
-
return pExchangeInfo->pResult;
}
- int32_t comp = 0;
- for(int32_t j = 0; j < totalSources; ++j) {
- SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j);
- if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) {
- comp += 1;
- }
- }
-
- if (comp == totalSources) {
+ if (completed == totalSources) {
return setAllSourcesCompleted(pOperator, startTs);
}
}
@@ -2748,12 +2717,13 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
}
int64_t endTs = taosGetTimestampUs();
- qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
- totalSources, endTs - startTs);
+ qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
+ totalSources, (endTs - startTs)/1000.0);
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
+ tsem_wait(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS;
}
@@ -2878,11 +2848,10 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
for (int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
- dataInfo.pEx = pInfo;
- dataInfo.index = i;
- dataInfo.id = id;
- void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
- if (ret == NULL) {
+ dataInfo.taskId = id;
+ dataInfo.index = i;
+ SSourceDataInfo *pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
+ if (pDs == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
@@ -2910,6 +2879,8 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
taosArrayPush(pInfo->pSources, pNode);
}
+ pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
+
return initDataSource(numOfSources, pInfo, id);
}
@@ -2925,18 +2896,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto _error;
}
- pInfo->seqLoadData = false;
- pInfo->pTransporter = pTransporter;
- pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
tsem_init(&pInfo->ready, 0, 0);
- pOperator->name = "ExchangeOperator";
+ pInfo->seqLoadData = false;
+ pInfo->pTransporter = pTransporter;
+ pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
+ pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
- pOperator->info = pInfo;
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
+ pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
- pOperator->pTaskInfo = pTaskInfo;
+ pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
@@ -2944,7 +2915,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
_error:
if (pInfo != NULL) {
- destroyExchangeOperatorInfo(pInfo, LIST_LENGTH(pExNode->pSrcEndPoints));
+ doDestroyExchangeOperatorInfo(pInfo);
}
taosMemoryFreeClear(pInfo);
@@ -4091,6 +4062,12 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
+ taosRemoveRef(exchangeObjRefPool, pExInfo->self);
+}
+
+void doDestroyExchangeOperatorInfo(void* param) {
+ SExchangeInfo* pExInfo = (SExchangeInfo*) param;
+
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {
diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c
index 10a5475555..1331c37507 100644
--- a/source/util/src/tcache.c
+++ b/source/util/src/tcache.c
@@ -53,19 +53,19 @@ typedef struct SCacheEntry {
SCacheNode *next;
} SCacheEntry;
-typedef struct STrashElem {
+struct STrashElem {
struct STrashElem *prev;
struct STrashElem *next;
SCacheNode *pData;
-} STrashElem;
+};
-typedef struct SCacheIter {
+struct SCacheIter {
SCacheObj *pCacheObj;
SCacheNode **pCurrent;
int32_t entryIndex;
int32_t index;
int32_t numOfObj;
-} SCacheIter;
+};
/*
* to accommodate the old data which has the same key value of new one in hashList