Merge pull request #13174 from taosdata/feature/qnode

fix: fix memory leak
This commit is contained in:
dapan1121 2022-05-28 22:44:00 +08:00 committed by GitHub
commit 5da1ead67b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 246 additions and 87 deletions

View File

@ -26,14 +26,17 @@ extern "C" {
typedef struct SQnode SQnode;
typedef struct {
int64_t numOfStartTask;
int64_t numOfStopTask;
int64_t numOfRecvedFetch;
int64_t numOfSentHb;
int64_t numOfSentFetch;
int64_t numOfTaskInQueue;
int64_t numOfProcessedQuery;
int64_t numOfProcessedCQuery;
int64_t numOfProcessedFetch;
int64_t numOfProcessedDrop;
int64_t memSizeInCache;
int64_t dataSizeSend;
int64_t dataSizeRecv;
int64_t numOfQueryInQueue;
int64_t numOfFetchInQueue;
int64_t numOfErrors;
int64_t waitTimeInQueryQUeue;
int64_t waitTimeInFetchQUeue;
} SQnodeLoad;
typedef struct {
@ -71,10 +74,10 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
* @param pQnode The qnode object.
* @param pMsg The request message
*/
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg);
int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_QNODE_H_*/
#endif /*_TD_QNODE_H_*/

View File

@ -52,22 +52,24 @@ typedef struct {
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
void qWorkerDestroy(void **qWorkerMgmt);
int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type);
#ifdef __cplusplus
}
#endif

View File

@ -69,6 +69,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0027)
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)

View File

@ -46,6 +46,7 @@ typedef struct {
void *ahandle;
int32_t workerId;
int32_t threadNum;
int64_t timestamp;
} SQueueInfo;
typedef enum {
@ -80,7 +81,7 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
void taosResetQsetThread(STaosQset *qset, void *pItem);

View File

@ -1249,6 +1249,8 @@ void resetConnectDB(STscObj* pTscObj) {
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
assert(pResultInfo != NULL && pRsp != NULL);
taosMemoryFreeClear(pResultInfo->pRspMsg);
pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows);

View File

@ -521,10 +521,10 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY) {
newColData = taosMemoryCalloc(1, charLen + 1);
memcpy(newColData, varDataVal(inputData), charLen);
bool ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight);
int32_t ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight);
if (ret != TSDB_CODE_SUCCESS) {
taosMemoryFree(newColData);
return ret;
return TSDB_CODE_INVALID_TIMESTAMP;
}
taosMemoryFree(newColData);
} else if (type == TSDB_DATA_TYPE_NCHAR) {

View File

@ -16,7 +16,11 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {
SQnodeLoad qload = {0};
qndGetLoad(pMgmt->pQnode, &qload);
}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonQmInfo qmInfo = {0};

View File

@ -36,7 +36,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
code = qmProcessGetMonitorInfoReq(pMgmt, pMsg);
break;
default:
code = qndProcessQueryMsg(pMgmt->pQnode, pMsg);
code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg);
break;
}

View File

@ -62,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmProcessNetTestReq(pDnode, pRpc);
return;
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
qWorkerProcessFetchRsp(NULL, NULL, pRpc);
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return;
} else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) {
dmSetMnodeEpSet(&pDnode->data, pEpSet);

View File

@ -26,19 +26,19 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
mTrace("msg:%p, in query queue is processing", pMsg);
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg);
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg);
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_FETCH:
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg);
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_DROP_TASK:
code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg);
code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg);
code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
default:
terrno = TSDB_CODE_VND_APP_ERROR;

View File

@ -40,37 +40,46 @@ void qndClose(SQnode *pQnode) {
taosMemoryFree(pQnode);
}
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
SMsgCb* pCb = &pQnode->msgCb;
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
pLoad->numOfQueryInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, QUERY_QUEUE);
pLoad->numOfFetchInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, FETCH_QUEUE);
pLoad->waitTimeInQueryQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, QUERY_QUEUE);
pLoad->waitTimeInFetchQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, FETCH_QUEUE);
return 0;
}
int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
int32_t code = -1;
SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
qTrace("message in qnode queue is processing");
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg);
code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg);
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg);
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_FETCH_RSP:
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg);
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_CANCEL_TASK:
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg);
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_DROP_TASK:
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg);
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_CONSUME:
// code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break;
case TDMT_VND_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg);
code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
default:
qError("unknown msg type:%d in qnode queue", pMsg->msgType);

View File

@ -191,9 +191,9 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
@ -206,13 +206,16 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
@ -231,9 +234,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;

View File

@ -71,6 +71,16 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
qDebug("empty db vgroup");
}
if (pResult->pDbInfo && taosArrayGetSize(pResult->pDbInfo) > 0) {
num = taosArrayGetSize(pResult->pDbInfo);
for (int32_t i = 0; i < num; ++i) {
SDbInfo *pDb = taosArrayGet(pResult->pDbInfo, i);
qDebug("db %d dbInfo: vgVer:%d, tbNum:%d, dbId:%" PRIx64, i, pDb->vgVer, pDb->tbNum, pDb->dbId);
}
} else {
qDebug("empty db info");
}
if (pResult->pTableHash && taosArrayGetSize(pResult->pTableHash) > 0) {
num = taosArrayGetSize(pResult->pTableHash);
for (int32_t i = 0; i < num; ++i) {
@ -127,6 +137,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
SCatalogReq req = {0};
req.pTableMeta = taosArrayInit(2, sizeof(SName));
req.pDbVgroup = taosArrayInit(2, TSDB_DB_FNAME_LEN);
req.pDbInfo = taosArrayInit(2, TSDB_DB_FNAME_LEN);
req.pTableHash = taosArrayInit(2, sizeof(SName));
req.pUdf = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
req.pDbCfg = taosArrayInit(2, TSDB_DB_FNAME_LEN);
@ -149,9 +160,11 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
strcpy(dbFName, "1.db1");
taosArrayPush(req.pDbVgroup, dbFName);
taosArrayPush(req.pDbCfg, dbFName);
taosArrayPush(req.pDbInfo, dbFName);
strcpy(dbFName, "1.db2");
taosArrayPush(req.pDbVgroup, dbFName);
taosArrayPush(req.pDbCfg, dbFName);
taosArrayPush(req.pDbInfo, dbFName);
strcpy(funcName, "udf1");
taosArrayPush(req.pUdf, funcName);

View File

@ -560,8 +560,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pAggNode->pAggFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pAggNode->pAggFuncs) {
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pAggNode->pAggFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->totalRowSize);
if (pAggNode->pGroupKeys) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);

View File

@ -2601,6 +2601,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
pStart += sizeof(int32_t) * numOfRows;
if (colLen[i] > 0) {
taosMemoryFreeClear(pColInfoData->pData);
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
}
} else {
@ -2758,6 +2759,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
pExchangeInfo->loadInfo.totalRows);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1;
taosMemoryFreeClear(pDataInfo->pRsp);
continue;
}
@ -2765,6 +2767,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
}
@ -2785,10 +2788,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
}
}
taosMemoryFreeClear(pDataInfo->pRsp);
return pExchangeInfo->pResult;
}
@ -2890,7 +2895,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
pDataInfo->totalRows, pLoadInfo->totalRows);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1;
pExchangeInfo->current += 1;
taosMemoryFreeClear(pDataInfo->pRsp);
continue;
}
@ -2916,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
}
pOperator->resultInfo.totalRows += pRes->info.rows;
taosMemoryFreeClear(pDataInfo->pRsp);
return pExchangeInfo->pResult;
}
}

View File

@ -106,6 +106,7 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
}
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
strncpy(varDataVal(pVal->datum.p), (const char*)pParam->buffer, pVal->node.resType.bytes);
pVal->node.resType.bytes += VARSTR_HEADER_SIZE;
break;
case TSDB_DATA_TYPE_NCHAR: {
pVal->node.resType.bytes *= TSDB_NCHAR_SIZE;
@ -120,7 +121,7 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
return errno;
}
varDataSetLen(pVal->datum.p, output);
pVal->node.resType.bytes = output;
pVal->node.resType.bytes = output + VARSTR_HEADER_SIZE;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:

View File

@ -145,6 +145,15 @@ typedef struct SQWSchStatus {
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
typedef struct SQWWaitTimeStat {
uint64_t num;
uint64_t total;
} SQWWaitTimeStat;
typedef struct SQWStat {
SQWWaitTimeStat msgWait[2];
} SQWStat;
// Qnode/Vnode level task management
typedef struct SQWorker {
int64_t refId;
@ -155,9 +164,10 @@ typedef struct SQWorker {
tmr_h hbTimer;
SRWLatch schLock;
// SRWLatch ctxLock;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb;
SQWStat stat;
} SQWorker;
typedef struct SQWorkerMgmt {
@ -322,6 +332,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);

View File

@ -248,7 +248,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -257,6 +257,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSubQueryMsg *msg = pMsg->pCont;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE);
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -286,7 +288,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
@ -295,6 +297,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWTaskCtx * handles = NULL;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -316,7 +320,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
@ -324,6 +328,8 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SResFetchReq *msg = pMsg->pCont;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -349,13 +355,16 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
qProcessFetchRsp(NULL, pMsg, NULL);
pMsg->pCont = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
@ -363,6 +372,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -390,7 +402,7 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
@ -399,6 +411,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
STaskDropReq *msg = pMsg->pCont;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -429,7 +443,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
@ -438,6 +452,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSchedulerHbReq req = {0};
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == pMsg->pCont) {
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);

View File

@ -499,4 +499,43 @@ int32_t qwOpenRef(void) {
return TSDB_CODE_SUCCESS;
}
int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
if (ts <= 0) {
return TSDB_CODE_SUCCESS;
}
int64_t duration = taosGetTimestampUs() - ts;
switch (type) {
case QUERY_QUEUE:
++mgmt->stat.msgWait[0].num;
mgmt->stat.msgWait[0].total += duration;
break;
case FETCH_QUEUE:
++mgmt->stat.msgWait[1].num;
mgmt->stat.msgWait[1].total += duration;
break;
default:
qError("unsupported queue type %d", type);
return TSDB_CODE_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type) {
SQWWaitTimeStat *pStat = NULL;
switch (type) {
case QUERY_QUEUE:
pStat = &mgmt->stat.msgWait[0];
return pStat->num ? (pStat->total/pStat->num) : 0;
case FETCH_QUEUE:
pStat = &mgmt->stat.msgWait[1];
return pStat->num ? (pStat->total/pStat->num) : 0;
default:
qError("unsupported queue type %d", type);
return -1;
}
}

View File

@ -950,4 +950,9 @@ void qWorkerDestroy(void **qWorkerMgmt) {
}
}
int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type) {
return qwGetWaitTimeInQueue((SQWorker *)qWorkerMgmt, type);
}

View File

@ -635,7 +635,7 @@ void *queryThread(void *param) {
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
if (qwtTestEnableSleep) {
taosUsleep(taosRand()%5);
}
@ -657,7 +657,7 @@ void *fetchThread(void *param) {
while (!qwtTestStop) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
if (qwtTestEnableSleep) {
taosUsleep(taosRand()%5);
}
@ -679,7 +679,7 @@ void *dropThread(void *param) {
while (!qwtTestStop) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
if (qwtTestEnableSleep) {
taosUsleep(taosRand()%5);
}
@ -758,9 +758,9 @@ void *queryQueueThread(void *param) {
}
if (TDMT_VND_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc);
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
assert(0);
@ -815,13 +815,13 @@ void *fetchQueueThread(void *param) {
switch (fetchRpc->msgType) {
case TDMT_VND_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc);
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_VND_CANCEL_TASK:
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc);
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_VND_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc);
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
break;
default:
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
@ -878,16 +878,16 @@ TEST(seqTest, normalCase) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
ASSERT_EQ(code, 0);
//code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
//ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
ASSERT_EQ(code, 0);
qWorkerDestroy(&mgmt);
@ -914,10 +914,10 @@ TEST(seqTest, cancelFirst) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
ASSERT_TRUE(0 != code);
qWorkerDestroy(&mgmt);
@ -959,7 +959,7 @@ TEST(seqTest, randCase) {
if (r >= 0 && r < maxr/5) {
printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
} else if (r >= maxr/5 && r < maxr * 2/5) {
//printf("Ready,%d\n", t++);
//qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
@ -970,14 +970,14 @@ TEST(seqTest, randCase) {
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
if (qwtTestEnableSleep) {
taosUsleep(1);
}
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
if (qwtTestEnableSleep) {
taosUsleep(1);
}

View File

@ -51,7 +51,7 @@ typedef struct SScalarCtx {
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)

View File

@ -3553,7 +3553,11 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
sclConvertToTsValueNode(stat->precision, valueNode);
int32_t code = sclConvertToTsValueNode(stat->precision, valueNode);
if (code) {
stat->code = code;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
@ -3687,7 +3691,7 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
for (int32_t i = 0; i < nodeNum; ++i) {
SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
sclConvertToTsValueNode(pStat->precision, valueNode);
FLT_ERR_JRET(sclConvertToTsValueNode(pStat->precision, valueNode));
}
_return:

View File

@ -20,17 +20,19 @@ int32_t scalarGetOperatorParamNum(EOperatorType type) {
return 2;
}
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
char *timeStr = valueNode->datum.p;
if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i) !=
TSDB_CODE_SUCCESS) {
valueNode->datum.i = 0;
int32_t code = convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosMemoryFree(timeStr);
valueNode->typeData = valueNode->datum.i;
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
return TSDB_CODE_SUCCESS;
}
@ -546,6 +548,7 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
SOperatorNode *node = (SOperatorNode *)*pNode;
int32_t code = 0;
if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
SValueNode *valueNode = (SValueNode *)node->pLeft;
@ -555,7 +558,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight)
&& ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
code = sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
if (code) {
ctx->code = code;
return DEAL_RES_ERROR;
}
}
}
@ -567,7 +574,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft)
&& ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
code = sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
if (code) {
ctx->code = code;
return DEAL_RES_ERROR;
}
}
}

View File

@ -94,6 +94,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
rspCode);
taosMemoryFreeClear(msg);
SCH_RET(atomic_load_32(&pJob->errCode));
}
@ -121,6 +122,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
SCH_ERR_JRET(rspCode);
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
@ -145,6 +148,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
SCH_ERR_JRET(rspCode);
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
@ -164,6 +169,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
@ -210,6 +218,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
}
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
@ -224,6 +234,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
@ -275,6 +287,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
@ -282,6 +296,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(schFetchFromRemote(pJob));
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
@ -300,6 +316,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
msg = NULL;
schProcessOnDataFetched(pJob);
break;
}
@ -322,6 +340,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
taosMemoryFreeClear(msg);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}

View File

@ -74,6 +74,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")

View File

@ -26,6 +26,7 @@ typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode {
STaosQnode *next;
STaosQueue *queue;
int64_t timestamp;
int32_t size;
int8_t itype;
int8_t reserved[3];
@ -144,6 +145,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) {
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
pNode->size = size;
pNode->itype = itype;
pNode->timestamp = taosGetTimestampUs();
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -393,7 +395,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp) {
STaosQnode *pNode = NULL;
int32_t code = 0;
@ -415,6 +417,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI
*ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp;
if (ts) *ts = pNode->timestamp;
queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL;

View File

@ -75,19 +75,20 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
void *msg = NULL;
void *ahandle = NULL;
int32_t code = 0;
int64_t ts = 0;
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ts, &ahandle, &fp) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break;
}
if (fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num, .timestamp = ts};
(*fp)(&info, msg);
}
}

View File

@ -587,6 +587,8 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
int32_t width = (int32_t)strlen(field->name);
switch (field->type) {
case TSDB_DATA_TYPE_NULL:
return TMAX(4, width); // null
case TSDB_DATA_TYPE_BOOL:
return TMAX(5, width); // 'false'