Merge branch '3.0' of https://github.com/taosdata/TDengine into TD-20251
This commit is contained in:
commit
f1c297680a
|
@ -8,6 +8,7 @@ import DiscordSVG from './discord.svg'
|
|||
import TwitterSVG from './twitter.svg'
|
||||
import YouTubeSVG from './youtube.svg'
|
||||
import LinkedInSVG from './linkedin.svg'
|
||||
import StackOverflowSVG from './stackoverflow.svg'
|
||||
|
||||
You can install and run TDengine on Linux/Windows/macOS machines as well as Docker containers. You can also deploy TDengine as a managed service with TDengine Cloud.
|
||||
|
||||
|
@ -35,10 +36,19 @@ The TDengine Knowledge Map covers the various knowledge points of TDengine, reve
|
|||
|
||||
<table width="100%">
|
||||
<tr align="center" style={{border:0}}>
|
||||
<td width="20%" style={{border:0}}><a href="https://github.com/taosdata/TDengine" target="_blank"><GitHubSVG /><p>Star GitHub</p></a></td>
|
||||
<td width="20%" style={{border:0}}><a href="https://discord.com/invite/VZdSuUg4pS" target="_blank"><DiscordSVG /><p>Join Discord</p></a></td>
|
||||
<td width="20%" style={{border:0}}><a href="https://twitter.com/TDengineDB" target="_blank"><TwitterSVG /><p>Follow Twitter</p></a></td>
|
||||
<td width="20%" style={{border:0}}><a href="https://www.youtube.com/@tdengine" target="_blank"><YouTubeSVG /><p>Subscribe YouTube</p></a></td>
|
||||
<td width="20%" style={{border:0}}><a href="https://www.linkedin.com/company/tdengine" target="_blank"><LinkedInSVG /><p>Follow LinkedIn</p></a></td>
|
||||
<td width="16%" style={{border:0}}><a href="https://github.com/taosdata/TDengine" target="_blank"><GitHubSVG /></a></td>
|
||||
<td width="16%" style={{border:0}}><a href="https://discord.com/invite/VZdSuUg4pS" target="_blank"><DiscordSVG /></a></td>
|
||||
<td width="16%" style={{border:0}}><a href="https://twitter.com/TDengineDB" target="_blank"><TwitterSVG /></a></td>
|
||||
<td width="16%" style={{border:0}}><a href="https://www.youtube.com/@tdengine" target="_blank"><YouTubeSVG /></a></td>
|
||||
<td width="16%" style={{border:0}}><a href="https://www.linkedin.com/company/tdengine" target="_blank"><LinkedInSVG /></a></td>
|
||||
<td width="16%" style={{border:0}}><a href="https://stackoverflow.com/questions/tagged/tdengine" target="_blank"><StackOverflowSVG /></a></td>
|
||||
</tr>
|
||||
<tr align="center" style={{border:0,backgroundColor:'transparent'}}>
|
||||
<td width="16%" style={{border:0,padding:0}}><a href="https://github.com/taosdata/TDengine" target="_blank">Star GitHub</a></td>
|
||||
<td width="16%" style={{border:0,padding:0}}><a href="https://discord.com/invite/VZdSuUg4pS" target="_blank">Join Discord</a></td>
|
||||
<td width="16%" style={{border:0,padding:0}}><a href="https://twitter.com/TDengineDB" target="_blank">Follow Twitter</a></td>
|
||||
<td width="16%" style={{border:0,padding:0}}><a href="https://www.youtube.com/@tdengine" target="_blank">Subscribe YouTube</a></td>
|
||||
<td width="16%" style={{border:0,padding:0}}><a href="https://www.linkedin.com/company/tdengine" target="_blank">Follow LinkedIn</a></td>
|
||||
<td width="16%" style={{border:0,padding:0}}><a href="https://stackoverflow.com/questions/tagged/tdengine" target="_blank">Ask StackOverflow</a></td>
|
||||
</tr>
|
||||
</table>
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="-8 0 48 48" width="50" height="50">
|
||||
<path d="M26 41v-9h4v13H0V32h4v9h22z" fill="#BCBBBB" />
|
||||
<path
|
||||
d="M23 34l.8-3-16.1-3.3L7 31l16 3zM9.2 23.2l15 7 1.4-3-15-7-1.4 3zm4.2-7.4L26 26.4l2.1-2.5-12.7-10.6-2.1 2.5zM21.5 8l-2.7 2 9.9 13.3 2.7-2L21.5 8zM7 38h16v-3H7v3z"
|
||||
fill="#F48024"
|
||||
/>
|
||||
</svg>
|
After Width: | Height: | Size: 350 B |
|
@ -171,6 +171,7 @@ typedef struct SExchangeLogicNode {
|
|||
SLogicNode node;
|
||||
int32_t srcStartGroupId;
|
||||
int32_t srcEndGroupId;
|
||||
bool seqRecvData;
|
||||
} SExchangeLogicNode;
|
||||
|
||||
typedef struct SMergeLogicNode {
|
||||
|
@ -416,6 +417,7 @@ typedef struct SExchangePhysiNode {
|
|||
int32_t srcEndGroupId;
|
||||
bool singleChannel;
|
||||
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
|
||||
bool seqRecvData;
|
||||
} SExchangePhysiNode;
|
||||
|
||||
typedef struct SMergePhysiNode {
|
||||
|
|
|
@ -224,7 +224,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
ASSERT(taosArrayGetSize(pStream->tasks) == 1);
|
||||
|
||||
while (1) {
|
||||
SVgObj* pVgroup;
|
||||
SVgObj* pVgroup = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
|
||||
|
@ -258,6 +258,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -382,6 +383,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
} else {
|
||||
if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) {
|
||||
sdbRelease(pSdb, pSnode);
|
||||
|
@ -396,6 +398,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -459,6 +462,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
pEpInfo->nodeId = pTask->nodeId;
|
||||
pEpInfo->taskId = pTask->taskId;
|
||||
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1187,11 +1187,11 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
goto NEXT;
|
||||
}
|
||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
nodesDestroyList(pNodeList);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
||||
nodesDestroyNode(pAst);
|
||||
nodesDestroyList(pNodeList);
|
||||
sdbRelease(pSdb, pTopic);
|
||||
return -1;
|
||||
}
|
||||
mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
|
@ -1230,11 +1230,11 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
|
|||
goto NEXT;
|
||||
}
|
||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
nodesDestroyNode(pAst);
|
||||
nodesDestroyList(pNodeList);
|
||||
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
|
||||
mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
|
||||
nodesDestroyNode(pAst);
|
||||
nodesDestroyList(pNodeList);
|
||||
sdbRelease(pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
||||
|
@ -1279,11 +1279,11 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
|
|||
goto NEXT;
|
||||
}
|
||||
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
nodesDestroyNode(pAst);
|
||||
nodesDestroyList(pNodeList);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
|
||||
mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
|
||||
nodesDestroyNode(pAst);
|
||||
nodesDestroyList(pNodeList);
|
||||
sdbRelease(pSdb, pSma);
|
||||
return -1;
|
||||
}
|
||||
mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||
|
|
|
@ -298,35 +298,24 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot)
|
|||
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
|
||||
if (pMeta->code == 0) {
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
rpcMsg.info = pMsg->info;
|
||||
rpcMsg.info.conn.applyIndex = pMeta->index;
|
||||
rpcMsg.info.conn.applyTerm = pMeta->term;
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
rpcMsg.info = pMsg->info;
|
||||
rpcMsg.info.conn.applyIndex = pMeta->index;
|
||||
rpcMsg.info.conn.applyTerm = pMeta->term;
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
|
||||
", weak:%d, code:%d, state:%d %s, type:%s",
|
||||
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
|
||||
pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
|
||||
", weak:%d, code:%d, state:%d %s, type:%s",
|
||||
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, pMeta->code,
|
||||
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
|
||||
|
||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||
} else {
|
||||
SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info};
|
||||
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", pVnode->config.vgId,
|
||||
TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code));
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
}
|
||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||
if (pMeta->isWeak == 0) {
|
||||
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
||||
}
|
||||
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
||||
}
|
||||
|
||||
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||
|
@ -420,7 +409,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
|
|||
|
||||
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
vDebug("vgId:%d, become follower", pVnode->config.vgId);
|
||||
vInfo("vgId:%d, become follower", pVnode->config.vgId);
|
||||
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
|
|
|
@ -51,9 +51,9 @@ typedef struct SSourceDataInfo {
|
|||
const char* taskId;
|
||||
} SSourceDataInfo;
|
||||
|
||||
static void destroyExchangeOperatorInfo(void* param);
|
||||
static void freeBlock(void* pParam);
|
||||
static void freeSourceDataInfo(void* param);
|
||||
static void destroyExchangeOperatorInfo(void* param);
|
||||
static void freeBlock(void* pParam);
|
||||
static void freeSourceDataInfo(void* param);
|
||||
static void* setAllSourcesCompleted(SOperatorInfo* pOperator);
|
||||
|
||||
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
|
||||
|
@ -62,7 +62,9 @@ static int32_t getCompletedSources(const SArray* pArray);
|
|||
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
|
||||
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
|
||||
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
|
||||
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
|
||||
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
|
||||
bool holdDataInBuf);
|
||||
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
|
||||
|
||||
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -105,41 +107,33 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
if (pRsp->numOfRows == 0) {
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
|
||||
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
||||
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
|
||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||
break;
|
||||
}
|
||||
|
||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||
int32_t index = 0;
|
||||
char* pStart = pRetrieveRsp->data;
|
||||
while (index++ < pRetrieveRsp->numOfBlocks) {
|
||||
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
||||
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
|
||||
if (code != 0) {
|
||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
||||
code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
|
||||
pDataInfo->totalRows += pRetrieveRsp->numOfRows;
|
||||
|
||||
if (pRsp->completed == 1) {
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
||||
", total:%.2f Kb, try next %d/%" PRIzu,
|
||||
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
||||
", total:%.2f Kb, try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
||||
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
|
||||
i + 1, totalSources);
|
||||
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
|
||||
totalSources);
|
||||
} else {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
|
||||
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
|
||||
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
|
||||
}
|
||||
|
@ -164,7 +158,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
}
|
||||
}
|
||||
|
||||
_error:
|
||||
_error:
|
||||
pTaskInfo->code = code;
|
||||
}
|
||||
|
||||
|
@ -306,16 +300,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
|
|||
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
|
||||
qAppendTaskStopInfo(pTaskInfo, &stopInfo);
|
||||
|
||||
pInfo->seqLoadData = false;
|
||||
pInfo->seqLoadData = pExNode->seqRecvData;
|
||||
pInfo->pTransporter = pTransporter;
|
||||
|
||||
setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
doDestroyExchangeOperatorInfo(pInfo);
|
||||
}
|
||||
|
@ -386,7 +382,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
} else {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
pSourceDataInfo->code = code;
|
||||
qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo);
|
||||
qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
|
||||
pExchangeInfo);
|
||||
}
|
||||
|
||||
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
|
||||
|
@ -531,7 +528,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
|
||||
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
|
||||
pLoadInfo->totalElapsed / 1000.0);
|
||||
|
@ -586,10 +583,32 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
|
||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||
|
||||
char* pStart = pRetrieveRsp->data;
|
||||
int32_t index = 0;
|
||||
int32_t code = 0;
|
||||
while (index++ < pRetrieveRsp->numOfBlocks) {
|
||||
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
||||
|
||||
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
|
||||
if (code != 0) {
|
||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||
SExchangeInfo* pExchangeInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int32_t code = 0;
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
|
||||
|
@ -599,13 +618,15 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
|
||||
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
|
||||
|
||||
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||
|
||||
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -619,7 +640,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
if (pRsp->numOfRows == 0) {
|
||||
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 " try next",
|
||||
", totalRows:%" PRIu64 " try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||
|
||||
|
@ -629,14 +650,15 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
continue;
|
||||
}
|
||||
|
||||
code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||
|
||||
char* pStart = pRetrieveRsp->data;
|
||||
int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, NULL, &pStart);
|
||||
|
||||
if (pRsp->completed == 1) {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
||||
totalSources);
|
||||
|
@ -645,7 +667,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
pExchangeInfo->current += 1;
|
||||
} else {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
|
||||
", totalBytes:%" PRIu64,
|
||||
", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||
}
|
||||
|
@ -656,6 +678,10 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
|
||||
|
|
|
@ -3655,6 +3655,11 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
setSessionWinOutputInfo(pStUpdated, &winInfo);
|
||||
winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
|
||||
pAggSup->pResultRows, pStUpdated, pStDeleted);
|
||||
// coverity scan error
|
||||
if (!winInfo.pOutputBuf) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
|
||||
pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
|
|
|
@ -434,6 +434,7 @@ static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicN
|
|||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
COPY_SCALAR_FIELD(srcStartGroupId);
|
||||
COPY_SCALAR_FIELD(srcEndGroupId);
|
||||
COPY_SCALAR_FIELD(seqRecvData);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1864,6 +1864,7 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
|
|||
static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
|
||||
static const char* jkExchangePhysiPlanSeqRecvData = "SeqRecvData";
|
||||
|
||||
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj;
|
||||
|
@ -1878,6 +1879,9 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkExchangePhysiPlanSeqRecvData, pNode->seqRecvData);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1895,6 +1899,9 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkExchangePhysiPlanSeqRecvData, &pNode->seqRecvData);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2428,7 +2428,8 @@ enum {
|
|||
PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
|
||||
PHY_EXCHANGE_CODE_SRC_ENDPOINTS
|
||||
PHY_EXCHANGE_CODE_SRC_ENDPOINTS,
|
||||
PHY_EXCHANGE_CODE_SEQ_RECV_DATA
|
||||
};
|
||||
|
||||
static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -2447,6 +2448,9 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_SRC_ENDPOINTS, nodeListToMsg, pNode->pSrcEndPoints);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SEQ_RECV_DATA, pNode->seqRecvData);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2473,6 +2477,9 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_EXCHANGE_CODE_SRC_ENDPOINTS:
|
||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSrcEndPoints);
|
||||
break;
|
||||
case PHY_EXCHANGE_CODE_SEQ_RECV_DATA:
|
||||
code = tlvDecodeBool(pTlv, &pNode->seqRecvData);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -3685,9 +3685,19 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
|
|||
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
|
||||
}
|
||||
|
||||
static int32_t translateInsertTable(STranslateContext* pCxt, SNode* pTable) {
|
||||
int32_t code = translateFrom(pCxt, pTable);
|
||||
if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
|
||||
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType) {
|
||||
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
|
||||
"insert data into super table is not supported");
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||
pCxt->pCurrStmt = (SNode*)pInsert;
|
||||
int32_t code = translateFrom(pCxt, pInsert->pTable);
|
||||
int32_t code = translateInsertTable(pCxt, pInsert->pTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateInsertCols(pCxt, pInsert);
|
||||
}
|
||||
|
@ -7089,9 +7099,10 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
|
||||
static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||
SVAlterTbReq* pReq) {
|
||||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
SSchema* pSchema = getTagSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid tag name: %s",
|
||||
pStmt->colName);
|
||||
}
|
||||
|
||||
pReq->tagName = strdup(pStmt->colName);
|
||||
|
|
|
@ -36,6 +36,7 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
|
|||
typedef struct SRewriteExprCxt {
|
||||
int32_t errCode;
|
||||
SNodeList* pExprs;
|
||||
bool* pOutputs;
|
||||
} SRewriteExprCxt;
|
||||
|
||||
static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) {
|
||||
|
@ -63,14 +64,30 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) {
|
|||
}
|
||||
|
||||
static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
||||
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
|
||||
switch (nodeType(*pNode)) {
|
||||
case QUERY_NODE_COLUMN: {
|
||||
if (NULL != pCxt->pOutputs) {
|
||||
SNode* pExpr;
|
||||
int32_t index = 0;
|
||||
FOREACH(pExpr, pCxt->pExprs) {
|
||||
if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) {
|
||||
pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0);
|
||||
}
|
||||
if (nodesEqualNode(pExpr, *pNode)) {
|
||||
pCxt->pOutputs[index] = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_OPERATOR:
|
||||
case QUERY_NODE_LOGIC_CONDITION:
|
||||
case QUERY_NODE_FUNCTION:
|
||||
case QUERY_NODE_CASE_WHEN: {
|
||||
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
|
||||
SNode* pExpr;
|
||||
int32_t index = 0;
|
||||
SNode* pExpr;
|
||||
int32_t index = 0;
|
||||
FOREACH(pExpr, pCxt->pExprs) {
|
||||
if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) {
|
||||
pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0);
|
||||
|
@ -89,6 +106,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
|||
}
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = (SNode*)pCol;
|
||||
if (NULL != pCxt->pOutputs) {
|
||||
pCxt->pOutputs[index] = true;
|
||||
}
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
++index;
|
||||
|
@ -121,7 +141,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
|
|||
|
||||
static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClause clause) {
|
||||
nodesWalkExpr(pExpr, doNameExpr, NULL);
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = NULL};
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = NULL, .pOutputs = NULL};
|
||||
cxt.errCode = nodesListMakeAppend(&cxt.pExprs, pExpr);
|
||||
if (TSDB_CODE_SUCCESS == cxt.errCode) {
|
||||
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
|
||||
|
@ -130,23 +150,50 @@ static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClau
|
|||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static int32_t rewriteExprsForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
|
||||
static int32_t cloneRewriteExprs(SNodeList* pExprs, bool* pOutputs, SNodeList** pRewriteExpr) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t index = 0;
|
||||
SNode* pExpr = NULL;
|
||||
FOREACH(pExpr, pExprs) {
|
||||
if (pOutputs[index]) {
|
||||
code = nodesListMakeStrictAppend(pRewriteExpr, nodesCloneNode(pExpr));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
NODES_DESTORY_LIST(*pRewriteExpr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t rewriteExprsForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause,
|
||||
SNodeList** pRewriteExprs) {
|
||||
nodesWalkExprs(pExprs, doNameExpr, NULL);
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs};
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL};
|
||||
if (NULL != pRewriteExprs) {
|
||||
cxt.pOutputs = taosMemoryCalloc(LIST_LENGTH(pExprs), sizeof(bool));
|
||||
if (NULL == cxt.pOutputs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
|
||||
if (TSDB_CODE_SUCCESS == cxt.errCode && NULL != pRewriteExprs) {
|
||||
cxt.errCode = cloneRewriteExprs(pExprs, cxt.pOutputs, pRewriteExprs);
|
||||
}
|
||||
taosMemoryFree(cxt.pOutputs);
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static int32_t rewriteExpr(SNodeList* pExprs, SNode** pTarget) {
|
||||
nodesWalkExprs(pExprs, doNameExpr, NULL);
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs};
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL};
|
||||
nodesRewriteExpr(pTarget, doRewriteExpr, &cxt);
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) {
|
||||
nodesWalkExprs(pExprs, doNameExpr, NULL);
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs};
|
||||
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL};
|
||||
nodesRewriteExprs(pTarget, doRewriteExpr, &cxt);
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
@ -311,7 +358,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
|
||||
// rewrite the expression in subsequent clauses
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM);
|
||||
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM, NULL);
|
||||
}
|
||||
|
||||
pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan);
|
||||
|
@ -509,23 +556,20 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
|
||||
// rewrite the expression in subsequent clauses
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
|
||||
code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||
code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY, NULL);
|
||||
}
|
||||
|
||||
if (NULL != pSelect->pGroupByList) {
|
||||
if (NULL != pAgg->pGroupKeys) {
|
||||
code = nodesListStrictAppendList(pAgg->pGroupKeys, nodesCloneList(pSelect->pGroupByList));
|
||||
} else {
|
||||
pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
|
||||
if (NULL == pAgg->pGroupKeys) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
|
||||
if (NULL == pAgg->pGroupKeys) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
// rewrite the expression in subsequent clauses
|
||||
SNodeList* pOutputGroupKeys = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY, &pOutputGroupKeys);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
|
||||
|
@ -536,9 +580,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
}
|
||||
|
||||
// set the output
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) {
|
||||
code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pOutputGroupKeys) {
|
||||
code = createColumnByRewriteExprs(pOutputGroupKeys, &pAgg->node.pTargets);
|
||||
}
|
||||
nodesDestroyList(pOutputGroupKeys);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
|
||||
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
|
||||
}
|
||||
|
@ -574,7 +620,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
|
|||
// indefinite rows functions and _select_values functions
|
||||
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pIdfRowsFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT);
|
||||
code = rewriteExprsForSelect(pIdfRowsFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL);
|
||||
}
|
||||
|
||||
// set the output
|
||||
|
@ -612,7 +658,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
|
|||
// interp functions and _group_key functions
|
||||
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, isInterpFunc, &pInterpFunc->pFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT);
|
||||
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) {
|
||||
|
@ -656,7 +702,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
|
|||
|
||||
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
|
||||
code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW, NULL);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -854,10 +900,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
|
||||
int32_t code = partFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL);
|
||||
code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL);
|
||||
code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets);
|
||||
|
@ -1066,7 +1112,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
|
|||
|
||||
// rewrite the expression in subsequent clauses
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT);
|
||||
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT, NULL);
|
||||
}
|
||||
|
||||
// set the output
|
||||
|
|
|
@ -1476,19 +1476,33 @@ static bool partTagsHasIndefRowsSelectFunc(SNodeList* pFuncs) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SNodeList* pAggFuncs) {
|
||||
bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAggFuncs);
|
||||
static bool partTagsNeedOutput(SNode* pExpr, SNodeList* pTargets) {
|
||||
SNode* pOutput = NULL;
|
||||
FOREACH(pOutput, pTargets) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
||||
if (nodesEqualNode(pExpr, pOutput)) {
|
||||
return true;
|
||||
}
|
||||
} else if (0 == strcmp(((SExprNode*)pExpr)->aliasName, ((SColumnNode*)pOutput)->colName)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SAggLogicNode* pAgg) {
|
||||
bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAgg->pAggFuncs);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t index = 0;
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pGroupTags) {
|
||||
if (index++ < start) {
|
||||
if (index++ < start || !partTagsNeedOutput(pNode, pAgg->node.pTargets)) {
|
||||
continue;
|
||||
}
|
||||
if (hasIndefRowsSelectFunc) {
|
||||
code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode));
|
||||
code = nodesListStrictAppend(pAgg->pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode));
|
||||
} else {
|
||||
code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_group_key", pNode));
|
||||
code = nodesListStrictAppend(pAgg->pAggFuncs, partTagsCreateWrapperFunc("_group_key", pNode));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
|
@ -1541,7 +1555,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
}
|
||||
NODES_DESTORY_LIST(pAgg->pGroupKeys);
|
||||
if (TSDB_CODE_SUCCESS == code && start >= 0) {
|
||||
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg->pAggFuncs);
|
||||
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -1064,6 +1064,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
|
|||
|
||||
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
|
||||
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
|
||||
pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
|
||||
*pPhyNode = (SPhysiNode*)pExchange;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -292,6 +292,43 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
return ((SScanLogicNode*)pNode)->pGroupTags;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static bool stbSplHasPartTbname(SNodeList* pPartKeys) {
|
||||
if (NULL == pPartKeys) {
|
||||
return false;
|
||||
}
|
||||
SNode* pPartKey = NULL;
|
||||
FOREACH(pPartKey, pPartKeys) {
|
||||
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
|
||||
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
|
||||
}
|
||||
if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
|
||||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
|
||||
if (NULL != pAgg->pGroupKeys) {
|
||||
return stbSplHasPartTbname(pAgg->pGroupKeys);
|
||||
}
|
||||
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
|
||||
return false;
|
||||
}
|
||||
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
|
||||
}
|
||||
|
||||
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
|
@ -301,7 +338,9 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
|||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
|
||||
stbSplIsPartTableAgg((SAggLogicNode*)pNode)) &&
|
||||
stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||
return stbSplNeedSplitWindow(streamQuery, pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||
|
@ -676,27 +715,8 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
return ((SScanLogicNode*)pNode)->pGroupTags;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static bool stbSplIsPartTbanme(SNodeList* pPartKeys) {
|
||||
if (NULL == pPartKeys || 1 != LIST_LENGTH(pPartKeys)) {
|
||||
return false;
|
||||
}
|
||||
SNode* pPartKey = nodesListGetNode(pPartKeys, 0);
|
||||
return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
|
||||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType);
|
||||
}
|
||||
|
||||
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
|
||||
return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
||||
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
|
@ -713,6 +733,17 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI
|
|||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
|
||||
if (NULL == pNode) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
|
||||
return true;
|
||||
}
|
||||
return stbSplNeedSeqRecvData(pNode->pParent);
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (pCxt->pPlanCxt->streamQuery) {
|
||||
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
|
||||
|
@ -728,6 +759,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
|
|||
code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
|
||||
}
|
||||
|
@ -797,7 +829,17 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pPartAgg = NULL;
|
||||
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -812,6 +854,13 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (stbSplIsPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
|
||||
return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
|
||||
}
|
||||
return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
|
||||
}
|
||||
|
||||
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
|
|
|
@ -232,7 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S
|
|||
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
||||
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
|
||||
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
|
||||
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode);
|
||||
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode);
|
||||
|
||||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||
|
|
|
@ -124,6 +124,7 @@ typedef struct SyncHeartbeat {
|
|||
SyncIndex commitIndex;
|
||||
SyncTerm privateTerm;
|
||||
SyncTerm minMatchIndex;
|
||||
int64_t timeStamp;
|
||||
} SyncHeartbeat;
|
||||
|
||||
typedef struct SyncHeartbeatReply {
|
||||
|
|
|
@ -640,7 +640,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
}
|
||||
|
||||
// heartbeat timeout
|
||||
if (syncNodeHeartbeatTimeout(pSyncNode)) {
|
||||
if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
|
||||
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
|
||||
|
@ -2039,6 +2039,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
pSyncMsg->privateTerm = 0;
|
||||
pSyncMsg->timeStamp = taosGetTimestampMs();
|
||||
|
||||
// send msg
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
|
@ -2094,7 +2095,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
|
|||
return code;
|
||||
}
|
||||
|
||||
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) {
|
||||
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->replicaNum == 1) {
|
||||
return false;
|
||||
}
|
||||
|
@ -2148,7 +2149,11 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
|||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeat* pMsg = pRpcMsg->pCont;
|
||||
syncLogRecvHeartbeat(ths, pMsg, "");
|
||||
|
||||
int64_t tsMs = taosGetTimestampMs();
|
||||
char buf[128];
|
||||
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs);
|
||||
syncLogRecvHeartbeat(ths, pMsg, buf);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
|
||||
|
@ -2161,6 +2166,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
pMsgReply->timeStamp = taosGetTimestampMs();
|
||||
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
|
||||
|
||||
syncNodeResetElectTimer(ths);
|
||||
ths->minMatchIndex = pMsg->minMatchIndex;
|
||||
|
||||
|
@ -2220,9 +2227,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
|
||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
|
||||
syncLogRecvHeartbeatReply(ths, pMsg, "");
|
||||
|
||||
int64_t tsMs = taosGetTimestampMs();
|
||||
char buf[128];
|
||||
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs);
|
||||
syncLogRecvHeartbeatReply(ths, pMsg, buf);
|
||||
|
||||
// update last reply time, make decision whether the other node is alive or not
|
||||
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
|
||||
|
@ -2500,6 +2509,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
|||
SRpcMsg rpcMsg = {0};
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
|
||||
|
||||
// user commit
|
||||
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
bool internalExecute = true;
|
||||
|
@ -2507,7 +2518,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
|||
internalExecute = false;
|
||||
}
|
||||
|
||||
sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute);
|
||||
sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
|
||||
TMSG_INFO(pEntry->msgType));
|
||||
|
||||
// execute fsm in apply thread, or execute outside syncPropose
|
||||
if (internalExecute) {
|
||||
|
|
|
@ -213,9 +213,11 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcM
|
|||
}
|
||||
|
||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) {
|
||||
sError("vgId:%d, build sync-heartbeat error", pSyncNode->vgId);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -226,6 +228,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
|||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
pSyncMsg->privateTerm = 0;
|
||||
pSyncMsg->timeStamp = ts;
|
||||
|
||||
// send msg
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "syncUtil.h"
|
||||
#include "syncIndexMgr.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncRaftCfg.h"
|
||||
#include "syncRaftStore.h"
|
||||
|
@ -175,6 +176,36 @@ void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
|||
}
|
||||
}
|
||||
|
||||
// for leader
|
||||
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
||||
int32_t len = 5;
|
||||
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
|
||||
|
||||
if (i < pSyncNode->replicaNum - 1) {
|
||||
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs);
|
||||
} else {
|
||||
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// for follower
|
||||
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
||||
int32_t len = 4;
|
||||
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
|
||||
|
||||
if (i < pSyncNode->replicaNum - 1) {
|
||||
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs);
|
||||
} else {
|
||||
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
||||
int32_t len = 1;
|
||||
|
||||
|
@ -221,6 +252,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
char peerStr[1024] = "{";
|
||||
syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
|
||||
|
||||
char hbrTimeStr[256] = "hbr:{";
|
||||
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));
|
||||
|
||||
char hbTimeStr[256] = "hb:{";
|
||||
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));
|
||||
|
||||
int32_t quorum = syncNodeDynamicQuorum(pNode);
|
||||
|
||||
char eventLog[512]; // {0};
|
||||
|
@ -243,12 +280,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
"%s"
|
||||
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
|
||||
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64
|
||||
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
|
||||
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
|
||||
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
|
||||
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
|
||||
pNode->pRaftCfg->isStandBy, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
|
||||
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
|
||||
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
|
||||
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr,
|
||||
hbrTimeStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -395,9 +433,8 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
|
|||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode,
|
||||
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
|
||||
"}, %s",
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
|
||||
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s",
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
|
||||
}
|
||||
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
|
||||
|
@ -406,9 +443,9 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
|
|||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode,
|
||||
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
|
||||
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||
"}, %s",
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
|
||||
}
|
||||
|
||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||
|
@ -416,8 +453,8 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
|
|||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
|
||||
pMsg->term, pMsg->privateTerm, s);
|
||||
sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port,
|
||||
pMsg->term, pMsg->timeStamp, s);
|
||||
}
|
||||
|
||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||
|
|
|
@ -417,8 +417,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
|
||||
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
||||
#,,,system-test,python3 ./test.py -f 1-insert/alter_database.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
||||
,,,system-test,python3 ./test.py -f 1-insert/alter_database.py
|
||||
,,,system-test,python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
|
||||
,,,system-test,python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
|
||||
,,,system-test,python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
||||
|
@ -436,7 +436,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_param_ttl.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data_muti_rows.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/db_tb_name_check.py
|
||||
,,,system-test,python3 ./test.py -f 1-insert/database_pre_suf.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/database_pre_suf.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/InsertFuturets.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py
|
||||
|
@ -561,8 +561,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sin.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sin.py -R
|
||||
,,,system-test,python3 ./test.py -f 2-query/smaTest.py
|
||||
,,,system-test,python3 ./test.py -f 2-query/smaTest.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py
|
||||
|
@ -595,8 +595,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Today.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -R
|
||||
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py
|
||||
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/twa.py
|
||||
|
@ -617,8 +617,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_childtable.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_normaltable.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/keep_expired.py
|
||||
,,,system-test,python3 ./test.py -f 1-insert/drop.py
|
||||
,,,system-test,python3 ./test.py -f 1-insert/drop.py -N 3 -M 3 -i False -n 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/drop.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/drop.py -N 3 -M 3 -i False -n 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join2.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/union1.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py
|
||||
|
@ -820,7 +820,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 2
|
||||
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 3
|
||||
|
@ -913,7 +913,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3
|
||||
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3
|
||||
|
|
|
@ -312,8 +312,14 @@ class TDDnode:
|
|||
cmd = "mintty -h never %s -c %s" % (
|
||||
binPath, self.cfgDir)
|
||||
else:
|
||||
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
|
||||
binPath, self.cfgDir)
|
||||
if self.asan:
|
||||
asanDir = "%s/sim/asan/dnode%d.asan" % (
|
||||
self.path, self.index)
|
||||
cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
|
||||
binPath, self.cfgDir, asanDir)
|
||||
else:
|
||||
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
|
||||
binPath, self.cfgDir)
|
||||
else:
|
||||
valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\" --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
|
||||
|
||||
|
@ -748,7 +754,7 @@ class TDDnodes:
|
|||
tdLog.exit("index:%d should on a scale of [1, 10]" % (index))
|
||||
|
||||
def StopAllSigint(self):
|
||||
tdLog.info("stop all dnodes sigint")
|
||||
tdLog.info("stop all dnodes sigint, asan:%d" % self.asan)
|
||||
if self.asan:
|
||||
tdLog.info("execute script: %s" % self.stopDnodesSigintPath)
|
||||
os.system(self.stopDnodesSigintPath)
|
||||
|
@ -756,7 +762,7 @@ class TDDnodes:
|
|||
return
|
||||
|
||||
def stopAll(self):
|
||||
tdLog.info("stop all dnodes")
|
||||
tdLog.info("stop all dnodes, asan:%d" % self.asan)
|
||||
if self.asan:
|
||||
tdLog.info("execute script: %s" % self.stopDnodesPath)
|
||||
os.system(self.stopDnodesPath)
|
||||
|
|
|
@ -44,8 +44,8 @@ class TDTestCase:
|
|||
def run(self):
|
||||
# insert data
|
||||
dbname = "db"
|
||||
self.insert_data1(f"{dbname}.t1", self.ts, 1000*10000)
|
||||
self.insert_data1(f"{dbname}.t4", self.ts, 1000*10000)
|
||||
self.insert_data1(f"{dbname}.t1", self.ts, 10*10000)
|
||||
self.insert_data1(f"{dbname}.t4", self.ts, 10*10000)
|
||||
# test base case
|
||||
# self.test_case1()
|
||||
tdLog.debug(" LIMIT test_case1 ............ [OK]")
|
||||
|
@ -53,7 +53,6 @@ class TDTestCase:
|
|||
# self.test_case2()
|
||||
tdLog.debug(" LIMIT test_case2 ............ [OK]")
|
||||
|
||||
|
||||
# stop
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
@ -77,15 +76,17 @@ class TDTestCase:
|
|||
|
||||
# insert data1
|
||||
def insert_data(self, tbname, ts_start, count):
|
||||
pre_insert = "insert into %s values"%tbname
|
||||
pre_insert = "insert into %s values" % tbname
|
||||
sql = pre_insert
|
||||
tdLog.debug("insert table %s rows=%d ..."%(tbname, count))
|
||||
tdLog.debug("insert table %s rows=%d ..." % (tbname, count))
|
||||
for i in range(count):
|
||||
sql += " (%d,%d)"%(ts_start + i*1000, i )
|
||||
if i >0 and i%30000 == 0:
|
||||
sql += " (%d,%d)" % (ts_start + i*1000, i)
|
||||
if i > 0 and i % 20000 == 0:
|
||||
tdLog.info("%d rows inserted" % i)
|
||||
tdSql.execute(sql)
|
||||
sql = pre_insert
|
||||
# end sql
|
||||
tdLog.info("insert_data end")
|
||||
if sql != pre_insert:
|
||||
tdSql.execute(sql)
|
||||
|
||||
|
@ -93,15 +94,17 @@ class TDTestCase:
|
|||
return
|
||||
|
||||
def insert_data1(self, tbname, ts_start, count):
|
||||
pre_insert = "insert into %s values"%tbname
|
||||
pre_insert = "insert into %s values" % tbname
|
||||
sql = pre_insert
|
||||
tdLog.debug("insert table %s rows=%d ..."%(tbname, count))
|
||||
tdLog.debug("insert table %s rows=%d ..." % (tbname, count))
|
||||
for i in range(count):
|
||||
sql += " (%d,%d,%d)"%(ts_start + i*1000, i , i+1)
|
||||
if i >0 and i%30000 == 0:
|
||||
sql += " (%d,%d,%d)" % (ts_start + i*1000, i, i+1)
|
||||
if i > 0 and i % 20000 == 0:
|
||||
tdLog.info("%d rows inserted" % i)
|
||||
tdSql.execute(sql)
|
||||
sql = pre_insert
|
||||
# end sql
|
||||
tdLog.info("insert_data1 end")
|
||||
if sql != pre_insert:
|
||||
tdSql.execute(sql)
|
||||
|
||||
|
|
|
@ -464,6 +464,7 @@ if __name__ == "__main__":
|
|||
tdDnodes.init(deployPath, masterIp)
|
||||
tdDnodes.setTestCluster(testCluster)
|
||||
tdDnodes.setValgrind(valgrind)
|
||||
tdDnodes.setAsan(asan)
|
||||
tdDnodes.stopAll()
|
||||
for dnode in tdDnodes.dnodes:
|
||||
tdDnodes.deploy(dnode.index,{})
|
||||
|
|
|
@ -39,4 +39,7 @@ void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb);
|
|||
// introduction
|
||||
void printfIntroduction();
|
||||
|
||||
// show all commands help
|
||||
void showHelp();
|
||||
|
||||
#endif
|
||||
|
|
|
@ -108,6 +108,7 @@ SWords shellCommands[] = {
|
|||
{"drop topic <topic_name> ;", 0, 0, NULL},
|
||||
{"drop stream <stream_name> ;", 0, 0, NULL},
|
||||
{"explain select", 0, 0, NULL}, // 44 append sub sql
|
||||
{"help;", 0, 0, NULL},
|
||||
{"grant all on <anyword> to <user_name> ;", 0, 0, NULL},
|
||||
{"grant read on <anyword> to <user_name> ;", 0, 0, NULL},
|
||||
{"grant write on <anyword> to <user_name> ;", 0, 0, NULL},
|
||||
|
@ -386,6 +387,8 @@ void showHelp() {
|
|||
drop stream <stream_name> ;\n\
|
||||
----- E ----- \n\
|
||||
explain select clause ...\n\
|
||||
----- H ----- \n\
|
||||
help;\n\
|
||||
----- I ----- \n\
|
||||
insert into <tb_name> values(...) ;\n\
|
||||
insert into <tb_name> using <stb_name> tags(...) values(...) ;\n\
|
||||
|
@ -1478,24 +1481,36 @@ bool matchSelectQuery(TAOS* con, SShellCmd* cmd) {
|
|||
|
||||
// if is input create fields or tags area, return true
|
||||
bool isCreateFieldsArea(char* p) {
|
||||
char* left = strrchr(p, '(');
|
||||
if (left == NULL) {
|
||||
// like 'create table st'
|
||||
return false;
|
||||
}
|
||||
// put to while, support like create table st(ts timestamp, bin1 binary(16), bin2 + blank + TAB
|
||||
char* p1 = strdup(p);
|
||||
bool ret = false;
|
||||
while (1) {
|
||||
char* left = strrchr(p1, '(');
|
||||
if (left == NULL) {
|
||||
// like 'create table st'
|
||||
ret = false;
|
||||
break;
|
||||
}
|
||||
|
||||
char* right = strrchr(p, ')');
|
||||
if (right == NULL) {
|
||||
// like 'create table st( '
|
||||
return true;
|
||||
}
|
||||
char* right = strrchr(p1, ')');
|
||||
if (right == NULL) {
|
||||
// like 'create table st( '
|
||||
ret = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (left > right) {
|
||||
// like 'create table st( ts timestamp, age int) tags(area '
|
||||
return true;
|
||||
}
|
||||
if (left > right) {
|
||||
// like 'create table st( ts timestamp, age int) tags(area '
|
||||
ret = true;
|
||||
break;
|
||||
}
|
||||
|
||||
return false;
|
||||
// set string end by small for next strrchr search
|
||||
*left = 0;
|
||||
}
|
||||
taosMemoryFree(p1);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool matchCreateTable(TAOS* con, SShellCmd* cmd) {
|
||||
|
|
|
@ -134,6 +134,12 @@ int32_t shellRunCommand(char *command, bool recordHistory) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// add help or help;
|
||||
if(strcmp(command, "help") == 0 || strcmp(command, "help;") == 0) {
|
||||
showHelp();
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (recordHistory) shellRecordCommandToHistory(command);
|
||||
|
||||
char quote = 0, *cmd = command;
|
||||
|
|
Loading…
Reference in New Issue