diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7ff70b243a..a7da778513 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2307,6 +2307,7 @@ typedef struct { typedef struct { SExplainRsp rsp; uint64_t qId; + uint64_t cId; uint64_t tId; int64_t rId; int32_t eId; @@ -2660,6 +2661,7 @@ typedef struct SSubQueryMsg { SMsgHead header; uint64_t sId; uint64_t queryId; + uint64_t clientId; uint64_t taskId; int64_t refId; int32_t execId; @@ -2689,6 +2691,7 @@ typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; + uint64_t clientId; uint64_t taskId; int32_t execId; } SQueryContinueReq; @@ -2723,6 +2726,7 @@ typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; + uint64_t clientId; uint64_t taskId; int32_t execId; SOperatorParam* pOpParam; @@ -2738,6 +2742,7 @@ typedef struct { typedef struct { uint64_t queryId; + uint64_t clientId; uint64_t taskId; int64_t refId; int32_t execId; @@ -2784,6 +2789,7 @@ typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; + uint64_t clientId; uint64_t taskId; int64_t refId; int32_t execId; @@ -2797,6 +2803,7 @@ typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; + uint64_t clientId; uint64_t taskId; int64_t refId; int32_t execId; @@ -2813,6 +2820,7 @@ typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; + uint64_t clientId; uint64_t taskId; int64_t refId; int32_t execId; @@ -4261,6 +4269,7 @@ typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; + uint64_t clientId; uint64_t taskId; uint32_t sqlLen; uint32_t phyLen; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d955a7b3b9..82cb899cb6 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -31,7 +31,7 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); +typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef struct { void* handle; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index cfd9c1a422..48852e5552 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -624,6 +624,7 @@ typedef struct SAggPhysiNode { typedef struct SDownstreamSourceNode { ENodeType type; SQueryNodeAddr addr; + uint64_t clientId; uint64_t taskId; uint64_t schedId; int32_t execId; diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 83daf0376c..cb4e359727 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -105,11 +105,11 @@ void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); -int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, - SQWMsg *qwMsg, SArray *explainRes); +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId, + int32_t eId, SQWMsg *qwMsg, SArray *explainRes); -int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, - void **pRsp, SArray *explainRes); +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId, + int32_t eId, void **pRsp, SArray *explainRes); int32_t qWorkerDbgEnableDebug(char *option); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index af8deff1a0..2988ffc4b1 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -83,6 +83,9 @@ void schedulerStopQueryHb(void* pTrans); int32_t schedulerUpdatePolicy(int32_t policy); int32_t schedulerEnableReSchedule(bool enableResche); +int32_t initClientId(void); +uint64_t getClientId(void); + /** * Cancel query job * @param pJob diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c56a627ec7..fa9df5be73 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -983,6 +983,7 @@ void taos_init_imp(void) { SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog"); ENV_ERR_RET(schedulerInit(), "failed to init scheduler"); + ENV_ERR_RET(initClientId(), "failed to init clientId"); tscDebug("starting to initialize TAOS driver"); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6d1699b911..edf0db9954 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8717,6 +8717,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { TAOS_CHECK_EXIT(tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen)); TAOS_CHECK_EXIT(tEncodeU32(&encoder, pReq->msgLen)); TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (uint8_t *)pReq->msg, pReq->msgLen)); + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); tEndEncode(&encoder); @@ -8765,6 +8766,11 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pReq->sql)); TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pReq->msgLen)); TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, NULL)); + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId)); + } else { + pReq->clientId = 0; + } tEndDecode(&decoder); @@ -8894,6 +8900,7 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { } else { TAOS_CHECK_EXIT(tEncodeI32(&encoder, 0)); } + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); tEndEncode(&encoder); @@ -8943,6 +8950,11 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) } TAOS_CHECK_EXIT(tDeserializeSOperatorParam(&decoder, pReq->pOpParam)); } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId)); + } else { + pReq->clientId = 0; + } tEndDecode(&decoder); @@ -9055,6 +9067,7 @@ int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->taskId)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->refId)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->execId)); + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); tEndEncode(&encoder); @@ -9095,6 +9108,11 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->taskId)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->refId)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->execId)); + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId)); + } else { + pReq->clientId = 0; + } tEndDecode(&decoder); @@ -9123,6 +9141,7 @@ int32_t tSerializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pReq TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->refId)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->execId)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type)); + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); tEndEncode(&encoder); @@ -9164,6 +9183,11 @@ int32_t tDeserializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pR TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->refId)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->execId)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, (int32_t *)&pReq->type)); + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId)); + } else { + pReq->clientId = 0; + } tEndDecode(&decoder); @@ -9353,6 +9377,10 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR TAOS_CHECK_EXIT(tEncodeI32(&encoder, status->execId)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, status->status)); } + for (int32_t i = 0; i < num; ++i) { + STaskStatus *status = taosArrayGet(pRsp->taskStatus, i); + TAOS_CHECK_EXIT(tEncodeU64(&encoder, status->clientId)); + } } else { TAOS_CHECK_EXIT(tEncodeI32(&encoder, 0)); } @@ -9396,6 +9424,12 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp * TAOS_CHECK_EXIT(terrno); } } + if (!tDecodeIsEnd(&decoder)) { + for (int32_t i = 0; i < num; ++i) { + STaskStatus *status = taosArrayGet(pRsp->taskStatus, i); + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &status->clientId)); + } + } } else { pRsp->taskStatus = NULL; } @@ -9560,6 +9594,7 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->sql)); TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->msg, pReq->phyLen)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->source)); + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); tEndEncode(&encoder); _exit: @@ -9608,6 +9643,11 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->source)); } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId)); + } else { + pReq->clientId = 0; + } tEndDecode(&decoder); _exit: diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 042fcf0120..7222f2d297 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -121,10 +121,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } } else { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, - pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); + qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 + " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i, + pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); } break; @@ -141,17 +141,17 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 + qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, - pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, - totalSources); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i, + pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, + pLoadInfo->totalSize / 1024.0, i + 1, totalSources); } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%" PRId64 - ", totalRows:%" PRIu64 ", total:%.2f Kb", - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, - pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); + qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 + " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, + pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } taosMemoryFreeClear(pDataInfo->pRsp); @@ -640,9 +640,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas if (pSource->localExec) { SDataBuf pBuf = {0}; - int32_t code = - (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, - pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); + int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, + pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData, + pTaskInfo->localFetch.explainRes); code = loadRemoteDataCallback(pWrapper, &pBuf, code); QUERY_CHECK_CODE(code, lino, _end); taosMemoryFree(pWrapper); @@ -650,6 +650,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas SResFetchReq req = {0}; req.header.vgId = pSource->addr.nodeId; req.sId = pSource->schedId; + req.clientId = pSource->clientId; req.taskId = pSource->taskId; req.queryId = pTaskInfo->id.queryId; req.execId = pSource->execId; @@ -691,9 +692,10 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas freeOperatorParam(req.pOpParam, OP_GET_PARAM); - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %p, %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, - pSource->execId, pExchangeInfo, sourceIndex, totalSources); + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 + ", execId:%d, %p, %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId, + pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -974,8 +976,9 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { } if (pDataInfo->code != TSDB_CODE_SUCCESS) { - qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo), - pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code)); + qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, + tstrerror(pDataInfo->code)); pOperator->pTaskInfo->code = pDataInfo->code; return pOperator->pTaskInfo->code; } @@ -984,10 +987,10 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { - qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 " try next", - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, - pDataInfo->totalRows, pLoadInfo->totalRows); + qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 + " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, + pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; @@ -1002,19 +1005,19 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 + qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, - pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, - totalSources); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, + pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, + pExchangeInfo->current + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, - pLoadInfo->totalRows, pLoadInfo->totalSize); + qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, + pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); } updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 1a5785190b..ba87912670 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -851,6 +851,7 @@ static int32_t slotDescCopy(const SSlotDescNode* pSrc, SSlotDescNode* pDst) { static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstreamSourceNode* pDst) { COPY_OBJECT_FIELD(addr, sizeof(SQueryNodeAddr)); + COPY_SCALAR_FIELD(clientId); COPY_SCALAR_FIELD(taskId); COPY_SCALAR_FIELD(schedId); COPY_SCALAR_FIELD(execId); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 3275cfd838..f7f858db78 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -5259,6 +5259,7 @@ static int32_t jsonToColumnDefNode(const SJson* pJson, void* pObj) { } static const char* jkDownstreamSourceAddr = "Addr"; +static const char* jkDownstreamSourceClientId = "ClientId"; static const char* jkDownstreamSourceTaskId = "TaskId"; static const char* jkDownstreamSourceSchedId = "SchedId"; static const char* jkDownstreamSourceExecId = "ExecId"; @@ -5268,6 +5269,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj; int32_t code = tjsonAddObject(pJson, jkDownstreamSourceAddr, queryNodeAddrToJson, &pNode->addr); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceClientId, pNode->clientId); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceTaskId, pNode->taskId); } @@ -5288,6 +5292,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) { SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)pObj; int32_t code = tjsonToObject(pJson, jkDownstreamSourceAddr, jsonToQueryNodeAddr, &pNode->addr); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceClientId, &pNode->clientId); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceTaskId, &pNode->taskId); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 28d0b9fbd4..bf3ea66e47 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -1769,6 +1769,9 @@ static int32_t downstreamSourceNodeInlineToMsg(const void* pObj, STlvEncoder* pE if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueI32(pEncoder, pNode->fetchMsgType); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueU64(pEncoder, pNode->clientId); + } return code; } @@ -1793,6 +1796,9 @@ static int32_t msgToDownstreamSourceNodeInlineToMsg(STlvDecoder* pDecoder, void* if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueI32(pDecoder, &pNode->fetchMsgType); } + if (TSDB_CODE_SUCCESS == code && !tlvDecodeEnd(pDecoder)) { + code = tlvDecodeValueU64(pDecoder, &pNode->clientId); + } return code; } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 7a902bdd66..708c285aea 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -215,8 +215,8 @@ typedef struct SQWorkerMgmt { #define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) \ (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST) -#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId -#define QW_IDS() sId, qId, tId, rId, eId +#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId, int32_t eId +#define QW_IDS() sId, qId, cId, tId, rId, eId #define QW_FPARAMS() mgmt, QW_IDS() #define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n) @@ -257,18 +257,20 @@ typedef struct SQWorkerMgmt { #define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch) #define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1) -#define QW_SET_QTID(id, qId, tId, eId) \ - do { \ - *(uint64_t *)(id) = (qId); \ - *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \ - *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \ +#define QW_SET_QTID(id, qId, cId, tId, eId) \ + do { \ + *(uint64_t *)(id) = (qId); \ + *(uint64_t *)((char *)(id) + sizeof(qId)) = (cId); \ + *(uint64_t *)((char *)(id) + sizeof(qId) + sizeof(cId)) = (tId); \ + *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(cId) + sizeof(tId)) = (eId); \ } while (0) -#define QW_GET_QTID(id, qId, tId, eId) \ - do { \ - (qId) = *(uint64_t *)(id); \ - (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \ - (eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)); \ +#define QW_GET_QTID(id, qId, cId, tId, eId) \ + do { \ + (qId) = *(uint64_t *)(id); \ + (cId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \ + (tId) = *(uint64_t *)((char *)(id) + sizeof(qId) + sizeof(cId)); \ + (eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(cId) + sizeof(tId)); \ } while (0) #define QW_ERR_RET(c) \ @@ -310,25 +312,31 @@ typedef struct SQWorkerMgmt { #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__) -#define QW_TASK_ELOG(param, ...) qError("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) -#define QW_TASK_WLOG(param, ...) qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) -#define QW_TASK_DLOG(param, ...) qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) +#define QW_TASK_ELOG(param, ...) \ + qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) +#define QW_TASK_WLOG(param, ...) \ + qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) +#define QW_TASK_DLOG(param, ...) \ + qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_DLOGL(param, ...) \ - qDebugL("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) + qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) -#define QW_TASK_ELOG_E(param) qError("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) -#define QW_TASK_WLOG_E(param) qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) -#define QW_TASK_DLOG_E(param) qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) +#define QW_TASK_ELOG_E(param) \ + qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId) +#define QW_TASK_WLOG_E(param) \ + qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId) +#define QW_TASK_DLOG_E(param) \ + qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId) -#define QW_SCH_TASK_ELOG(param, ...) \ - qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \ - __VA_ARGS__) -#define QW_SCH_TASK_WLOG(param, ...) \ - qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \ - __VA_ARGS__) -#define QW_SCH_TASK_DLOG(param, ...) \ - qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \ - __VA_ARGS__) +#define QW_SCH_TASK_ELOG(param, ...) \ + qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \ + qId, cId, tId, eId, __VA_ARGS__) +#define QW_SCH_TASK_WLOG(param, ...) \ + qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \ + cId, tId, eId, __VA_ARGS__) +#define QW_SCH_TASK_DLOG(param, ...) \ + qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \ + qId, cId, tId, eId, __VA_ARGS__) #define QW_LOCK_DEBUG(...) \ do { \ diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index d3b8d36b25..897080df3e 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -96,14 +96,14 @@ void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) { int32_t taskNum = taosHashGetSize(sch->tasksHash); QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum); - uint64_t qId, tId; + uint64_t qId, cId, tId; int32_t eId; SQWTaskStatus *pTask = NULL; void *pIter = taosHashIterate(sch->tasksHash, NULL); while (pIter) { pTask = (SQWTaskStatus *)pIter; void *key = taosHashGetKey(pIter, NULL); - QW_GET_QTID(key, qId, tId, eId); + QW_GET_QTID(key, qId, cId, tId, eId); QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status); @@ -118,13 +118,13 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) { int32_t i = 0; SQWTaskCtx *ctx = NULL; - uint64_t qId, tId; + uint64_t qId, cId, tId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); - QW_GET_QTID(key, qId, tId, eId); + QW_GET_QTID(key, qId, cId, tId, eId); QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, " "sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 20b81bfc14..7dbad90cc0 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -233,6 +233,7 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { qMsg.header.contLen = 0; qMsg.sId = sId; qMsg.queryId = qId; + qMsg.clientId = cId; qMsg.taskId = tId; qMsg.refId = rId; qMsg.execId = eId; @@ -284,6 +285,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { req->header.vgId = mgmt->nodeId; req->sId = sId; req->queryId = qId; + req->clientId = cId; req->taskId = tId; req->execId = eId; @@ -312,6 +314,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { qMsg.header.contLen = 0; qMsg.sId = sId; qMsg.queryId = qId; + qMsg.clientId = cId; qMsg.taskId = tId; qMsg.refId = rId; qMsg.execId = eId; @@ -416,6 +419,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran uint64_t sId = msg.sId; uint64_t qId = msg.queryId; + uint64_t cId = msg.clientId; uint64_t tId = msg.taskId; int64_t rId = msg.refId; int32_t eId = msg.execId; @@ -447,6 +451,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t sId = msg.sId; uint64_t qId = msg.queryId; + uint64_t cId = msg.clientId; uint64_t tId = msg.taskId; int64_t rId = msg.refId; int32_t eId = msg.execId; @@ -479,6 +484,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int uint64_t sId = msg.sId; uint64_t qId = msg.queryId; + uint64_t cId = msg.clientId; uint64_t tId = msg.taskId; int64_t rId = msg.refId; int32_t eId = msg.execId; @@ -524,6 +530,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in uint64_t sId = msg->sId; uint64_t qId = msg->queryId; + uint64_t cId = msg->clientId; uint64_t tId = msg->taskId; int64_t rId = 0; int32_t eId = msg->execId; @@ -557,6 +564,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int uint64_t sId = req.sId; uint64_t qId = req.queryId; + uint64_t cId = req.clientId; uint64_t tId = req.taskId; int64_t rId = 0; int32_t eId = req.execId; @@ -604,12 +612,14 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); + msg->clientId = be64toh(msg->clientId); msg->taskId = be64toh(msg->taskId); msg->refId = be64toh(msg->refId); msg->execId = ntohl(msg->execId); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; + uint64_t cId = msg->clientId; uint64_t tId = msg->taskId; int64_t rId = msg->refId; int32_t eId = msg->execId; @@ -646,6 +656,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 uint64_t sId = msg.sId; uint64_t qId = msg.queryId; + uint64_t cId = msg.clientId; uint64_t tId = msg.taskId; int64_t rId = msg.refId; int32_t eId = msg.execId; @@ -684,6 +695,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in uint64_t sId = msg.sId; uint64_t qId = msg.queryId; + uint64_t cId = msg.clientId; uint64_t tId = msg.taskId; int64_t rId = msg.refId; int32_t eId = msg.execId; @@ -753,6 +765,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD uint64_t sId = req.sId; uint64_t qId = req.queryId; + uint64_t cId = req.clientId; uint64_t tId = req.taskId; int64_t rId = 0; int32_t eId = -1; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index ef07a42629..917579deb0 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -137,8 +137,8 @@ int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchS void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); QW_LOCK(rwType, &sch->tasksLock); *task = taosHashGet(sch->tasksHash, id, sizeof(id)); @@ -153,8 +153,8 @@ int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, S int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) { int32_t code = 0; - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); SQWTaskStatus ntask = {0}; ntask.status = status; @@ -209,8 +209,8 @@ int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); } int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { @@ -222,8 +222,8 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { } int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { @@ -235,8 +235,8 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { } int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); SQWTaskCtx nctx = {0}; @@ -347,6 +347,7 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { (void)memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); localRsp.rsp.subplanInfo = pExec; localRsp.qId = qId; + localRsp.cId = cId; localRsp.tId = tId; localRsp.rId = rId; localRsp.eId = eId; @@ -376,8 +377,8 @@ _return: int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); SQWTaskCtx octx; SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); @@ -411,8 +412,8 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { SQWTaskStatus *task = NULL; int32_t code = 0; - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) { QW_TASK_WLOG_E("scheduler does not exist"); @@ -465,8 +466,8 @@ _return: int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { - char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_QTID(id, qId, tId, eId); + char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, cId, tId, eId); SQWTaskCtx octx; SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); @@ -588,14 +589,14 @@ void qwDestroyImpl(void *pMgmt) { mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); - uint64_t qId, tId; + uint64_t qId, cId, tId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); - QW_GET_QTID(key, qId, tId, eId); + QW_GET_QTID(key, qId, cId, tId, eId); qwFreeTaskCtx(ctx); QW_TASK_DLOG_E("task ctx freed"); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 9b96c1e519..13e1d0e231 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -19,7 +19,7 @@ SQWorkerMgmt gQwMgmt = { }; void qwStopAllTasks(SQWorker *mgmt) { - uint64_t qId, tId, sId; + uint64_t qId, cId, tId, sId; int32_t eId; int64_t rId = 0; int32_t code = TSDB_CODE_SUCCESS; @@ -28,7 +28,7 @@ void qwStopAllTasks(SQWorker *mgmt) { while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); - QW_GET_QTID(key, qId, tId, eId); + QW_GET_QTID(key, qId, cId, tId, eId); QW_LOCK(QW_WRITE, &ctx->lock); @@ -288,7 +288,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) // TODO GET EXECUTOR API TO GET MORE INFO - QW_GET_QTID(key, status.queryId, status.taskId, status.execId); + QW_GET_QTID(key, status.queryId, status.clientId, status.taskId, status.execId); status.status = taskStatus->status; status.refId = taskStatus->refId; @@ -1473,8 +1473,8 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, - SQWMsg *qwMsg, SArray *explainRes) { +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId, + int32_t eId, SQWMsg *qwMsg, SArray *explainRes) { SQWorker *mgmt = (SQWorker *)pMgmt; int32_t code = 0; SQWTaskCtx *ctx = NULL; @@ -1538,8 +1538,8 @@ _return: QW_RET(code); } -int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, - void **pRsp, SArray *explainRes) { +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId, + int32_t eId, void **pRsp, SArray *explainRes) { SQWorker *mgmt = (SQWorker *)pMgmt; int32_t code = 0; int32_t dataLen = 0; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 96b9d2da8d..6a910453f0 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -142,8 +142,9 @@ typedef struct SSchedulerCfg { } SSchedulerCfg; typedef struct SSchedulerMgmt { - uint64_t taskId; // sequential taksId - uint64_t sId; // schedulerId + uint64_t clientId; // unique clientId + uint64_t taskId; // sequential taksId + uint64_t sId; // schedulerId SSchedulerCfg cfg; bool exit; int32_t jobRef; @@ -163,6 +164,7 @@ typedef struct SSchTaskCallbackParam { SSchCallbackParamHeader head; uint64_t queryId; int64_t refId; + uint64_t clientId; uint64_t taskId; int32_t execId; void *pTrans; @@ -222,6 +224,7 @@ typedef struct SSchTimerParam { } SSchTimerParam; typedef struct SSchTask { + uint64_t clientId; // current client id uint64_t taskId; // task id SRWLatch lock; // task reentrant lock int32_t maxExecTimes; // task max exec times @@ -329,6 +332,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock) #define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock) +#define SCH_CLIENT_ID(_task) ((_task) ? (_task)->clientId : -1) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) @@ -449,21 +453,21 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) -#define SCH_TASK_ELOG(param, ...) \ - qError("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \ - __VA_ARGS__) -#define SCH_TASK_DLOG(param, ...) \ - qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \ - __VA_ARGS__) -#define SCH_TASK_TLOG(param, ...) \ - qTrace("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \ - __VA_ARGS__) -#define SCH_TASK_DLOGL(param, ...) \ - qDebugL("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \ - __VA_ARGS__) -#define SCH_TASK_WLOG(param, ...) \ - qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \ - __VA_ARGS__) +#define SCH_TASK_ELOG(param, ...) \ + qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) +#define SCH_TASK_DLOG(param, ...) \ + qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) +#define SCH_TASK_TLOG(param, ...) \ + qTrace("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) +#define SCH_TASK_DLOGL(param, ...) \ + qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) +#define SCH_TASK_WLOG(param, ...) \ + qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) #define SCH_SET_ERRNO(_err) \ do { \ diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b15a6a09d3..eefb32f783 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -500,8 +500,8 @@ _return: int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, - code); + qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, + pParam->clientId, pParam->taskId, code); // called if drop task rsp received code (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error @@ -517,8 +517,8 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, - code); + qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, + pParam->clientId, pParam->taskId, code); if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -595,6 +595,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo param->queryId = pJob->queryId; param->refId = pJob->refId; + param->clientId = SCH_CLIENT_ID(pTask); param->taskId = SCH_TASK_ID(pTask); param->pTrans = pJob->conn.pTrans; param->execId = pTask->execId; @@ -1138,6 +1139,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, req.header.vgId = addr->nodeId; req.sId = schMgmt.sId; req.queryId = pJob->queryId; + req.clientId = pTask->clientId; req.taskId = pTask->taskId; req.phyLen = pTask->msgLen; req.sqlLen = strlen(pJob->sql); @@ -1171,6 +1173,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.header.contLen = 0; qMsg.sId = schMgmt.sId; qMsg.queryId = pJob->queryId; + qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; qMsg.refId = pJob->refId; qMsg.execId = pTask->execId; @@ -1226,6 +1229,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, req.header.vgId = addr->nodeId; req.sId = schMgmt.sId; req.queryId = pJob->queryId; + req.clientId = pTask->clientId; req.taskId = pTask->taskId; req.execId = pTask->execId; @@ -1253,6 +1257,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.header.contLen = 0; qMsg.sId = schMgmt.sId; qMsg.queryId = pJob->queryId; + qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; qMsg.refId = pJob->refId; qMsg.execId = *(int32_t*)param; @@ -1310,6 +1315,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.header.contLen = 0; qMsg.sId = schMgmt.sId; qMsg.queryId = pJob->queryId; + qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; qMsg.refId = pJob->refId; qMsg.execId = pTask->execId; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index fe24633c12..9be0e3fc40 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -66,6 +66,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * pTask->execId = -1; pTask->failedExecId = -2; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; + pTask->clientId = getClientId(); pTask->taskId = schGenTaskId(); schInitTaskRetryTimes(pJob, pTask, pLevel); @@ -305,6 +306,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_LOCK(SCH_WRITE, &parent->planLock); SDownstreamSourceNode source = { .type = QUERY_NODE_DOWNSTREAM_SOURCE, + .clientId = pTask->clientId, .taskId = pTask->taskId, .schedId = schMgmt.sId, .execId = pTask->execId, @@ -996,8 +998,8 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t code = 0; - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId, - pStatus->execId, jobTaskStatusStr(pStatus->status)); + qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, + pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status)); if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) { continue; @@ -1043,13 +1045,14 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { continue; } - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); + qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", + localRsp->qId, localRsp->cId, localRsp->tId); pJob = NULL; (void)schAcquireJob(localRsp->rId, &pJob); if (NULL == pJob) { - qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, - localRsp->tId, localRsp->rId); + qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, + localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST); } @@ -1068,8 +1071,8 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { (void)schReleaseJob(pJob->refId); - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, - localRsp->tId, code); + qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", + localRsp->qId, localRsp->cId, localRsp->tId, code); SCH_ERR_JRET(code); @@ -1147,8 +1150,8 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { } } - SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, - pTask->execId, &qwMsg, explainRes)); + SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId, + pJob->refId, pTask->execId, &qwMsg, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { SCH_ERR_RET(schHandleExplainRes(explainRes)); @@ -1407,8 +1410,8 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { } } - SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, - pTask->execId, &pRsp, explainRes)); + SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId, + pJob->refId, pTask->execId, &pRsp, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { SCH_ERR_RET(schHandleExplainRes(explainRes)); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 4697de6f28..ac34099417 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -293,6 +293,18 @@ void schCloseJobRef(void) { } } +int32_t initClientId(void) { + int32_t code = taosGetSystemUUIDU64(&schMgmt.clientId); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to generate clientId since %s", tstrerror(code)); + SCH_ERR_RET(code); + } + qInfo("initialize"); + return TSDB_CODE_SUCCESS; +} + +uint64_t getClientId(void) { return schMgmt.clientId; } + uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); } #ifdef BUILD_NO_CALL diff --git a/tests/script/api/sameReqidTest.c b/tests/script/api/sameReqidTest.c new file mode 100644 index 0000000000..7507619886 --- /dev/null +++ b/tests/script/api/sameReqidTest.c @@ -0,0 +1,406 @@ +// sample code to verify multiple queries with the same reqid +// to compile: gcc -o sameReqdiTest sameReqidTest.c -ltaos + +#include +#include +#include +#include +#include "taos.h" + +#define NUM_ROUNDS 10 +#define CONST_REQ_ID 12345 +#define TEST_DB "test" + +#define CHECK_CONDITION(condition, prompt, errstr) \ + do { \ + if (!(condition)) { \ + printf("\033[31m[%s:%d] failed to " prompt ", reason: %s\033[0m\n", __func__, __LINE__, errstr); \ + exit(EXIT_FAILURE); \ + } \ + } while (0) + +#define CHECK_RES(res, prompt) CHECK_CONDITION(taos_errno(res) == 0, prompt, taos_errstr(res)) +#define CHECK_CODE(code, prompt) CHECK_CONDITION(code == 0, prompt, taos_errstr(NULL)) + +static TAOS* getNewConnection() { + const char* host = "127.0.0.1"; + const char* user = "root"; + const char* passwd = "taosdata"; + TAOS* taos = NULL; + + taos_options(TSDB_OPTION_TIMEZONE, "GMT-8"); + taos = taos_connect(host, user, passwd, "", 0); + CHECK_CONDITION(taos != NULL, "connect to db", taos_errstr(NULL)); + return taos; +} + +static void prepareData(TAOS* taos) { + TAOS_RES* res = NULL; + int32_t code = 0; + + res = taos_query(taos, "create database if not exists " TEST_DB " precision 'ns'"); + CHECK_RES(res, "create database"); + taos_free_result(res); + usleep(100000); + + code = taos_select_db(taos, TEST_DB); + CHECK_CODE(code, "switch to database"); + + res = taos_query(taos, "create table if not exists meters(ts timestamp, a int) tags(area int)"); + CHECK_RES(res, "create stable meters"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t0 using meters tags(0)"); + CHECK_RES(res, "create table t0"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t1 using meters tags(1)"); + CHECK_RES(res, "create table t1"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t2 using meters tags(2)"); + CHECK_RES(res, "create table t2"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t3 using meters tags(3)"); + CHECK_RES(res, "create table t3"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t4 using meters tags(4)"); + CHECK_RES(res, "create table t4"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t5 using meters tags(5)"); + CHECK_RES(res, "create table t5"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t6 using meters tags(6)"); + CHECK_RES(res, "create table t6"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t7 using meters tags(7)"); + CHECK_RES(res, "create table t7"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t8 using meters tags(8)"); + CHECK_RES(res, "create table t8"); + taos_free_result(res); + + res = taos_query(taos, "create table if not exists t9 using meters tags(9)"); + CHECK_RES(res, "create table t9"); + taos_free_result(res); + + res = taos_query(taos, + "insert into t0 values('2020-01-01 00:00:00.000', 0)" + " ('2020-01-01 00:01:00.000', 0)" + " ('2020-01-01 00:02:00.000', 0)" + " t1 values('2020-01-01 00:00:00.000', 1)" + " ('2020-01-01 00:01:00.000', 1)" + " ('2020-01-01 00:02:00.000', 1)" + " ('2020-01-01 00:03:00.000', 1)" + " t2 values('2020-01-01 00:00:00.000', 2)" + " ('2020-01-01 00:01:00.000', 2)" + " ('2020-01-01 00:01:01.000', 2)" + " ('2020-01-01 00:01:02.000', 2)" + " t3 values('2020-01-01 00:01:02.000', 3)" + " t4 values('2020-01-01 00:01:02.000', 4)" + " t5 values('2020-01-01 00:01:02.000', 5)" + " t6 values('2020-01-01 00:01:02.000', 6)" + " t7 values('2020-01-01 00:01:02.000', 7)" + " t8 values('2020-01-01 00:01:02.000', 8)" + " t9 values('2020-01-01 00:01:02.000', 9)"); + CHECK_RES(res, "insert into meters"); + CHECK_CONDITION(taos_affected_rows(res), "insert into meters", "insufficient count"); + taos_free_result(res); + + res = taos_query( + taos, + "create table if not exists m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 " + "double, bin binary(40), blob nchar(10))"); + CHECK_RES(res, "create table m1"); + taos_free_result(res); + + usleep(1000000); +} + +static void verifySchemaLess(TAOS* taos) { + TAOS_RES* res = NULL; + char* lines[] = { + "st,t1=3i64,t2=4f64,t3=L\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000", + "st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000", + "st,t2=5f64,t3=L\"ste\" c1=4i64,c2=true,c3=L\"iam\" 1626056811823316532", + "st,t1=4i64,t2=5f64,t3=L\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000", + "st,t2=5f64,t3=L\"ste2\" c3=L\"iamszhou\",c2=false 1626056811843316532", + "st,t2=5f64,t3=L\"ste2\" c3=L\"iamszhou\",c2=false,c5=5f64,c6=7u64,c7=32i32,c8=88.88f32 1626056812843316532", + "st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 " + "1626006933640000000", + "st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 " + "1626006933640000000", + "st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 " + "1626006933641000000"}; + + res = taos_schemaless_insert_with_reqid(taos, lines, sizeof(lines) / sizeof(char*), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS, CONST_REQ_ID); + CHECK_RES(res, "insert schema-less data"); + printf("successfully inserted %d rows\n", taos_affected_rows(res)); + taos_free_result(res); +} + +static int32_t printResult(TAOS_RES* res, int32_t nlimit) { + TAOS_ROW row = NULL; + TAOS_FIELD* fields = NULL; + int32_t numFields = 0; + int32_t nRows = 0; + + numFields = taos_num_fields(res); + fields = taos_fetch_fields(res); + while ((nlimit-- > 0) && (row = taos_fetch_row(res))) { + char temp[256] = {0}; + taos_print_row(temp, row, fields, numFields); + puts(temp); + nRows++; + } + return nRows; +} + +static void verifyQuery(TAOS* taos) { + TAOS_RES* res = NULL; + + res = taos_query_with_reqid(taos, "select * from meters", CONST_REQ_ID); + CHECK_RES(res, "select from meters"); + printResult(res, INT32_MAX); + taos_free_result(res); + + res = taos_query_with_reqid(taos, "select * from t0", CONST_REQ_ID); + CHECK_RES(res, "select from t0"); + printResult(res, INT32_MAX); + taos_free_result(res); + + res = taos_query_with_reqid(taos, "select * from t1", CONST_REQ_ID); + CHECK_RES(res, "select from t1"); + printResult(res, INT32_MAX); + taos_free_result(res); + + res = taos_query_with_reqid(taos, "select * from t2", CONST_REQ_ID); + CHECK_RES(res, "select from t2"); + printResult(res, INT32_MAX); + taos_free_result(res); + + res = taos_query_with_reqid(taos, "select * from t3", CONST_REQ_ID); + CHECK_RES(res, "select from t3"); + printResult(res, INT32_MAX); + taos_free_result(res); + + printf("succeed to read from meters\n"); +} + +void retrieveCallback(void* param, TAOS_RES* res, int32_t nrows) { + if (nrows == 0) { + taos_free_result(res); + } else { + printResult(res, nrows); + taos_fetch_rows_a(res, retrieveCallback, param); + } +} + +void selectCallback(void* param, TAOS_RES* res, int32_t code) { + CHECK_CODE(code, "read async from table"); + taos_fetch_rows_a(res, retrieveCallback, param); +} + +static void verifyQueryAsync(TAOS* taos) { + taos_query_a_with_reqid(taos, "select *from meters", selectCallback, NULL, CONST_REQ_ID); + taos_query_a_with_reqid(taos, "select *from t0", selectCallback, NULL, CONST_REQ_ID); + taos_query_a_with_reqid(taos, "select *from t1", selectCallback, NULL, CONST_REQ_ID); + taos_query_a_with_reqid(taos, "select *from t2", selectCallback, NULL, CONST_REQ_ID); + taos_query_a_with_reqid(taos, "select *from t3", selectCallback, NULL, CONST_REQ_ID); + + sleep(1); +} + +void veriryStmt(TAOS* taos) { + // insert 10 records + struct { + int64_t ts[10]; + int8_t b[10]; + int8_t v1[10]; + int16_t v2[10]; + int32_t v4[10]; + int64_t v8[10]; + float f4[10]; + double f8[10]; + char bin[10][40]; + char blob[10][80]; + } v; + + int32_t* t8_len = malloc(sizeof(int32_t) * 10); + int32_t* t16_len = malloc(sizeof(int32_t) * 10); + int32_t* t32_len = malloc(sizeof(int32_t) * 10); + int32_t* t64_len = malloc(sizeof(int32_t) * 10); + int32_t* float_len = malloc(sizeof(int32_t) * 10); + int32_t* double_len = malloc(sizeof(int32_t) * 10); + int32_t* bin_len = malloc(sizeof(int32_t) * 10); + int32_t* blob_len = malloc(sizeof(int32_t) * 10); + + TAOS_STMT* stmt = taos_stmt_init_with_reqid(taos, CONST_REQ_ID); + TAOS_MULTI_BIND params[10]; + char is_null[10] = {0}; + + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(v.ts[0]); + params[0].buffer = v.ts; + params[0].length = t64_len; + params[0].is_null = is_null; + params[0].num = 10; + + params[1].buffer_type = TSDB_DATA_TYPE_BOOL; + params[1].buffer_length = sizeof(v.b[0]); + params[1].buffer = v.b; + params[1].length = t8_len; + params[1].is_null = is_null; + params[1].num = 10; + + params[2].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[2].buffer_length = sizeof(v.v1[0]); + params[2].buffer = v.v1; + params[2].length = t8_len; + params[2].is_null = is_null; + params[2].num = 10; + + params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[3].buffer_length = sizeof(v.v2[0]); + params[3].buffer = v.v2; + params[3].length = t16_len; + params[3].is_null = is_null; + params[3].num = 10; + + params[4].buffer_type = TSDB_DATA_TYPE_INT; + params[4].buffer_length = sizeof(v.v4[0]); + params[4].buffer = v.v4; + params[4].length = t32_len; + params[4].is_null = is_null; + params[4].num = 10; + + params[5].buffer_type = TSDB_DATA_TYPE_BIGINT; + params[5].buffer_length = sizeof(v.v8[0]); + params[5].buffer = v.v8; + params[5].length = t64_len; + params[5].is_null = is_null; + params[5].num = 10; + + params[6].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[6].buffer_length = sizeof(v.f4[0]); + params[6].buffer = v.f4; + params[6].length = float_len; + params[6].is_null = is_null; + params[6].num = 10; + + params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE; + params[7].buffer_length = sizeof(v.f8[0]); + params[7].buffer = v.f8; + params[7].length = double_len; + params[7].is_null = is_null; + params[7].num = 10; + + params[8].buffer_type = TSDB_DATA_TYPE_BINARY; + params[8].buffer_length = sizeof(v.bin[0]); + params[8].buffer = v.bin; + params[8].length = bin_len; + params[8].is_null = is_null; + params[8].num = 10; + + params[9].buffer_type = TSDB_DATA_TYPE_NCHAR; + params[9].buffer_length = sizeof(v.blob[0]); + params[9].buffer = v.blob; + params[9].length = blob_len; + params[9].is_null = is_null; + params[9].num = 10; + + int32_t code = taos_stmt_prepare( + stmt, "insert into ? (ts, b, v1, v2, v4, v8, f4, f8, bin, blob) values(?,?,?,?,?,?,?,?,?,?)", 0); + CHECK_CODE(code, "taos_stmt_prepare"); + + code = taos_stmt_set_tbname(stmt, "m1"); + CHECK_CODE(code, "taos_stmt_set_tbname"); + + int64_t ts = 1591060628000000000; + for (int i = 0; i < 10; ++i) { + v.ts[i] = ts; + ts += 1000000; + is_null[i] = 0; + + v.b[i] = (int8_t)i % 2; + v.v1[i] = (int8_t)i; + v.v2[i] = (int16_t)(i * 2); + v.v4[i] = (int32_t)(i * 4); + v.v8[i] = (int64_t)(i * 8); + v.f4[i] = (float)(i * 40); + v.f8[i] = (double)(i * 80); + for (int j = 0; j < sizeof(v.bin[0]); ++j) { + v.bin[i][j] = (char)(i + '0'); + } + strcpy(v.blob[i], "一二三四五六七八九十"); + + t8_len[i] = sizeof(int8_t); + t16_len[i] = sizeof(int16_t); + t32_len[i] = sizeof(int32_t); + t64_len[i] = sizeof(int64_t); + float_len[i] = sizeof(float); + double_len[i] = sizeof(double); + bin_len[i] = sizeof(v.bin[0]); + blob_len[i] = (int32_t)strlen(v.blob[i]); + } + + code = taos_stmt_bind_param_batch(stmt, params); + CHECK_CODE(code, "taos_stmt_bind_param_batch"); + + code = taos_stmt_add_batch(stmt); + CHECK_CODE(code, "taos_stmt_add_batch"); + + code = taos_stmt_execute(stmt); + CHECK_CODE(code, "taos_stmt_execute"); + + taos_stmt_close(stmt); + + free(t8_len); + free(t16_len); + free(t32_len); + free(t64_len); + free(float_len); + free(double_len); + free(bin_len); + free(blob_len); +} + +int main(int argc, char* argv[]) { + TAOS* taos = NULL; + int32_t code = 0; + + taos = getNewConnection(); + taos_select_db(taos, TEST_DB); + CHECK_CODE(code, "switch to database"); + + printf("************ prepare data *************\n"); + prepareData(taos); + + for (int32_t i = 0; i < NUM_ROUNDS; ++i) { + printf("************ verify schema-less *************\n"); + verifySchemaLess(taos); + + printf("************ verify query *************\n"); + verifyQuery(taos); + + printf("********* verify async query **********\n"); + verifyQueryAsync(taos); + + printf("********* verify stmt query **********\n"); + veriryStmt(taos); + + printf("done\n"); + } + + taos_close(taos); + taos_cleanup(); + + return 0; +}