fix(query): add refId for exchange operator.

This commit is contained in:
Haojun Liao 2022-06-15 16:26:43 +08:00
parent 83ee553615
commit ec97183737
5 changed files with 80 additions and 98 deletions

View File

@ -20,6 +20,8 @@
extern "C" {
#endif
extern int32_t exchangeObjRefPool;
typedef struct {
char* pData;
bool isNull;

View File

@ -181,7 +181,6 @@ typedef struct SExecTaskInfo {
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
// uint64_t totalRows; // total number of rows
struct {
char *tablename;
char *dbname;
@ -222,10 +221,10 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv;
enum {
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
OP_EXEC_DONE = 0x9,
};
typedef struct SOperatorFpSet {
@ -262,13 +261,12 @@ typedef enum {
} EX_SOURCE_STATUS;
typedef struct SSourceDataInfo {
struct SExchangeInfo* pEx;
int32_t index;
SRetrieveTableRsp* pRsp;
uint64_t totalRows;
int32_t code;
EX_SOURCE_STATUS status;
const char* id;
const char* taskId;
} SSourceDataInfo;
typedef struct SLoadRemoteDataInfo {
@ -286,6 +284,7 @@ typedef struct SExchangeInfo {
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SLoadRemoteDataInfo loadInfo;
uint64_t self;
} SExchangeInfo;
#define COL_MATCH_FROM_COL_ID 0x1
@ -736,6 +735,8 @@ typedef struct SJoinOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
void doDestroyExchangeOperatorInfo(void* param);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);

View File

@ -13,28 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <vnode.h>
#include "dataSinkMgt.h"
#include "texception.h"
#include "os.h"
#include "tarray.h"
#include "tcache.h"
#include "tglobal.h"
#include "tref.h"
#include "dataSinkMgt.h"
#include "tmsg.h"
#include "tudf.h"
#include "executor.h"
#include "executorimpl.h"
#include "query.h"
#include "thash.h"
#include "tlosertree.h"
#include "ttypes.h"
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;
static void initRefPool() {
exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tref.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@ -37,9 +38,6 @@
#include "vnode.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN)
#define SET_MAIN_SCAN_FLAG(runtime) ((runtime)->scanFlag = MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define SDATA_BLOCK_INITIALIZER \
@ -47,12 +45,6 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
enum {
TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1,
TS_JOIN_TAG_NOT_EQUALS = 2,
};
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand();
@ -88,7 +80,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
//#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
@ -110,7 +101,6 @@ static void releaseQueryBuf(size_t numOfTables);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
@ -2399,28 +2389,22 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
#endif
}
// static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBlock, SHashObj* pTableIdInfo, int32_t
// order) {
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
// pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step;
//
// if (pTableQueryInfo->pTable == NULL) {
// return;
// }
//
// STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo);
// STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
// if (idinfo != NULL) {
// assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid);
// idinfo->key = tidInfo.key;
// } else {
// taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
// }
// }
typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId;
int32_t sourceIndex;
} SFetchRspHandleWrapper;
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param;
int32_t index = pSourceDataInfo->index;
SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*) param;
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
if (pExchangeInfo == NULL) {
qWarn("failed to acquire exchange operator, since it may have been released");
return TSDB_CODE_SUCCESS;
}
int32_t index = pWrapper->sourceIndex;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (code == TSDB_CODE_SUCCESS) {
pSourceDataInfo->pRsp = pMsg->pData;
@ -2432,13 +2416,17 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->id, index, pRsp->numOfRows);
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
} else {
pSourceDataInfo->code = code;
}
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
tsem_post(&pExchangeInfo->ready);
taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
taosMemoryFree(pWrapper);
return TSDB_CODE_SUCCESS;
}
@ -2487,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
@ -2500,11 +2488,15 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
return pTaskInfo->code;
}
pMsgSendInfo->param = pDataInfo;
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
@ -2630,9 +2622,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
while (1) {
int32_t completed = 0;
qDebug("%s current ready sources:%ld", GET_TASKID(pTaskInfo), pExchangeInfo->ready.__align);
tsem_wait(&pExchangeInfo->ready);
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
@ -2698,30 +2687,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
}
int32_t comp = 0;
for(int32_t j = 0; j < totalSources; ++j) {
SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j);
if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) {
comp += 1;
}
}
if (comp == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
}
return pExchangeInfo->pResult;
}
int32_t comp = 0;
for(int32_t j = 0; j < totalSources; ++j) {
SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j);
if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) {
comp += 1;
}
}
if (comp == totalSources) {
if (completed == totalSources) {
return setAllSourcesCompleted(pOperator, startTs);
}
}
@ -2748,12 +2717,13 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
}
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
totalSources, endTs - startTs);
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
totalSources, (endTs - startTs)/1000.0);
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
tsem_wait(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS;
}
@ -2878,11 +2848,10 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
for (int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
dataInfo.id = id;
void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (ret == NULL) {
dataInfo.taskId = id;
dataInfo.index = i;
SSourceDataInfo *pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (pDs == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -2910,6 +2879,8 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
taosArrayPush(pInfo->pSources, pNode);
}
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
return initDataSource(numOfSources, pInfo, id);
}
@ -2925,18 +2896,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto _error;
}
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator";
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
@ -2944,7 +2915,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, LIST_LENGTH(pExNode->pSrcEndPoints));
doDestroyExchangeOperatorInfo(pInfo);
}
taosMemoryFreeClear(pInfo);
@ -4091,6 +4062,12 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}
void doDestroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*) param;
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {

View File

@ -53,19 +53,19 @@ typedef struct SCacheEntry {
SCacheNode *next;
} SCacheEntry;
typedef struct STrashElem {
struct STrashElem {
struct STrashElem *prev;
struct STrashElem *next;
SCacheNode *pData;
} STrashElem;
};
typedef struct SCacheIter {
struct SCacheIter {
SCacheObj *pCacheObj;
SCacheNode **pCurrent;
int32_t entryIndex;
int32_t index;
int32_t numOfObj;
} SCacheIter;
};
/*
* to accommodate the old data which has the same key value of new one in hashList