Merge branch '3.0' into fix/TD-17400

This commit is contained in:
Ganlin Zhao 2022-07-18 20:06:09 +08:00
commit a3bde4d465
24 changed files with 155 additions and 128 deletions

View File

@ -2071,9 +2071,10 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
// TDMT_VND_ALTER_TABLE ===================== // TDMT_VND_ALTER_TABLE =====================
typedef struct { typedef struct {
char* tbName; char* tbName;
int8_t action; int8_t action;
char* colName; char* colName;
int32_t colId;
// TSDB_ALTER_TABLE_ADD_COLUMN // TSDB_ALTER_TABLE_ADD_COLUMN
int8_t type; int8_t type;
int8_t flags; int8_t flags;

View File

@ -26,6 +26,12 @@ extern "C" {
#define SLOT_NAME_LEN TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN #define SLOT_NAME_LEN TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN
typedef enum EDataOrderLevel {
DATA_ORDER_LEVEL_NONE = 1,
DATA_ORDER_LEVEL_IN_BLOCK,
DATA_ORDER_LEVEL_IN_GROUP
} EDataOrderLevel;
typedef struct SLogicNode { typedef struct SLogicNode {
ENodeType type; ENodeType type;
SNodeList* pTargets; // SColumnNode SNodeList* pTargets; // SColumnNode
@ -36,6 +42,8 @@ typedef struct SLogicNode {
uint8_t precision; uint8_t precision;
SNode* pLimit; SNode* pLimit;
SNode* pSlimit; SNode* pSlimit;
EDataOrderLevel requireDataOrder; // requirements for input data
EDataOrderLevel resultDataOrder; // properties of the output data
} SLogicNode; } SLogicNode;
typedef enum EScanType { typedef enum EScanType {
@ -78,7 +86,7 @@ typedef struct SScanLogicNode {
SNodeList* pGroupTags; SNodeList* pGroupTags;
bool groupSort; bool groupSort;
int8_t cacheLastMode; int8_t cacheLastMode;
bool hasNormalCols; // neither tag column nor primary key tag column bool hasNormalCols; // neither tag column nor primary key tag column
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode { typedef struct SJoinLogicNode {
@ -317,6 +325,7 @@ typedef STableScanPhysiNode SStreamScanPhysiNode;
typedef struct SProjectPhysiNode { typedef struct SProjectPhysiNode {
SPhysiNode node; SPhysiNode node;
SNodeList* pProjections; SNodeList* pProjections;
bool mergeDataBlock;
} SProjectPhysiNode; } SProjectPhysiNode;
typedef struct SIndefRowsFuncPhysiNode { typedef struct SIndefRowsFuncPhysiNode {

View File

@ -363,7 +363,11 @@ tmq_list_t* tmq_list_new() {
int32_t tmq_list_append(tmq_list_t* list, const char* src) { int32_t tmq_list_append(tmq_list_t* list, const char* src) {
SArray* container = &list->container; SArray* container = &list->container;
char* topic = strdup(src); if (src == NULL || src[0] == 0) return -1;
char* topic = strdup(src);
if (topic[0] != '`') {
strtolower(topic, src);
}
if (taosArrayPush(container, &topic) == NULL) return -1; if (taosArrayPush(container, &topic) == NULL) return -1;
return 0; return 0;
} }

View File

@ -228,7 +228,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui
uint32_t finalNumOfRows = numOfRow1 + numOfRow2; uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
// Handle the bitmap // Handle the bitmap
if (finalNumOfRows > *capacity || numOfRow1 == 0) { if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) {
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -262,7 +262,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
pColumnInfoData->varmeta.length = len + oldLen; pColumnInfoData->varmeta.length = len + oldLen;
} else { } else {
if (finalNumOfRows > *capacity || numOfRow1 == 0) { if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) {
ASSERT(finalNumOfRows * pColumnInfoData->info.bytes); ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
if (tmp == NULL) { if (tmp == NULL) {

View File

@ -179,6 +179,16 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
} else { } else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg); taosWriteQitem(pVnode->pWriteQ, pMsg);
#if 0 // tests for batch writes
if (pMsg->msgType == TDMT_VND_CREATE_TABLE) {
SRpcMsg *pDup = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
memcpy(pDup, pMsg, sizeof(SRpcMsg));
pDup->pCont = rpcMallocCont(pMsg->contLen);
memcpy(pDup->pCont, pMsg->pCont, pMsg->contLen);
pDup->info.handle = NULL;
taosWriteQitem(pVnode->pWriteQ, pDup);
}
#endif
} }
break; break;
case SYNC_QUEUE: case SYNC_QUEUE:

View File

@ -68,7 +68,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return 0;
} else { } else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset) < 0) {
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return 0;
@ -106,7 +106,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
} }
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset); qStreamPrepareScan(task, pOffset);
continue; continue;
} }

View File

@ -243,7 +243,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(msgLen);
if (data == NULL) { if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("failed to copy data for stream since out of memory"); tqError("failed to copy data for stream since out of memory");
return -1; return -1;
} }
memcpy(data, msg, msgLen); memcpy(data, msg, msgLen);

View File

@ -53,6 +53,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
*(int64_t *)(dc.data + dc.pos) = uid; *(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime; *(int64_t *)(dc.data + dc.pos + 8) = ctime;
vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
tEndDecode(&dc); tEndDecode(&dc);
} }
@ -381,7 +382,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
goto end; goto end;
} }
vInfo("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp); vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids); int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
if (ret != 0) { if (ret != 0) {
goto end; goto end;

View File

@ -17,35 +17,22 @@
#include "vnd.h" #include "vnd.h"
static inline bool vnodeIsMsgBlock(tmsg_t type) { static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA); return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) ||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL);
} }
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) { static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (!vnodeIsMsgBlock(type)) return; if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
int32_t count = atomic_add_fetch_32(&pVnode->blockCount, 1); tsem_wait(&pVnode->syncSem);
vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); }
} }
static inline void vnodeWaitBlockMsg(SVnode *pVnode) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
int32_t count = atomic_load_32(&pVnode->blockCount); if (vnodeIsMsgBlock(pMsg->msgType)) {
if (count <= 0) return; vTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
tsem_wait(&pVnode->syncSem);
}
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
if (!vnodeIsMsgBlock(type)) return;
int32_t count = atomic_load_32(&pVnode->blockCount);
if (count <= 0) return;
count = atomic_sub_fetch_32(&pVnode->blockCount, 1);
vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
if (count <= 0) {
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
} }
@ -143,6 +130,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
for (int32_t m = 0; m < numOfMsgs; m++) { for (int32_t m = 0; m < numOfMsgs; m++) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
@ -165,13 +154,14 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsg);
} else {
} }
} }
} }
if (code == 0) { if (code < 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code < 0) {
if (terrno == TSDB_CODE_SYN_NOT_LEADER) { if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
} else { } else {
@ -182,15 +172,12 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
} }
} else {
} }
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
vnodeWaitBlockMsg(pVnode);
} }
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
@ -213,7 +200,7 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
} }
vnodePostBlockMsg(pVnode, pMsg->msgType); vnodePostBlockMsg(pVnode, pMsg);
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
@ -418,7 +405,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
tmsgSendRsp(&rpcMsg); tmsgSendRsp(&rpcMsg);
} }
vnodePostBlockMsg(pVnode, TDMT_VND_ALTER_REPLICA); vnodePostBlockMsg(pVnode, pMsg);
} }
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {

View File

@ -548,37 +548,12 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
static const char* jkScanLogicPlanTableType = "TableType"; static const char* jkScanLogicPlanTableType = "TableType";
static const char* jkScanLogicPlanTableId = "TableId"; static const char* jkScanLogicPlanTableId = "TableId";
static const char* jkScanLogicPlanStableId = "StableId"; static const char* jkScanLogicPlanStableId = "StableId";
static const char* jkScanLogicPlanScanType = "ScanType";
static const char* jkScanLogicPlanScanCount = "ScanCount"; static const char* jkScanLogicPlanScanCount = "ScanCount";
static const char* jkScanLogicPlanReverseScanCount = "ReverseScanCount"; static const char* jkScanLogicPlanReverseScanCount = "ReverseScanCount";
static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanGroupTags = "GroupTags"; static const char* jkScanLogicPlanGroupTags = "GroupTags";
// typedef struct SScanLogicNode {
// uint64_t stableId;
// SVgroupsInfo* pVgroupList;
// EScanType scanType;
// uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
// STimeWindow scanRange;
// SName tableName;
// bool showRewrite;
// double ratio;
// SNodeList* pDynamicScanFuncs;
// int32_t dataRequired;
// int64_t interval;
// int64_t offset;
// int64_t sliding;
// int8_t intervalUnit;
// int8_t slidingUnit;
// SNode* pTagCond;
// SNode* pTagIndexCond;
// int8_t triggerType;
// int64_t watermark;
// int8_t igExpired;
// SArray* pSmaIndexes;
// SNodeList* pGroupTags;
// bool groupSort;
// } SScanLogicNode;
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
@ -598,6 +573,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanStableId, pNode->stableId); code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanStableId, pNode->stableId);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanScanType, pNode->scanType);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanScanCount, pNode->scanSeq[0]); code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanScanCount, pNode->scanSeq[0]);
} }
@ -634,6 +612,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkScanLogicPlanStableId, &pNode->stableId); code = tjsonGetUBigIntValue(pJson, jkScanLogicPlanStableId, &pNode->stableId);
} }
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkScanLogicPlanScanType, pNode->scanType, code);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkScanLogicPlanScanCount, &pNode->scanSeq[0]); code = tjsonGetUTinyIntValue(pJson, jkScanLogicPlanScanCount, &pNode->scanSeq[0]);
} }
@ -1677,6 +1658,7 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
} }
static const char* jkProjectPhysiPlanProjections = "Projections"; static const char* jkProjectPhysiPlanProjections = "Projections";
static const char* jkProjectPhysiPlanMergeDataBlock = "MergeDataBlock";
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj; const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj;
@ -1685,6 +1667,9 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkProjectPhysiPlanProjections, pNode->pProjections); code = nodeListToJson(pJson, jkProjectPhysiPlanProjections, pNode->pProjections);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
}
return code; return code;
} }
@ -1696,6 +1681,9 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkProjectPhysiPlanProjections, &pNode->pProjections); code = jsonToNodeList(pJson, jkProjectPhysiPlanProjections, &pNode->pProjections);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
}
return code; return code;
} }

View File

@ -201,7 +201,8 @@ static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) {
} }
static bool isUselessCol(SExprNode* pProj) { static bool isUselessCol(SExprNode* pProj) {
if (QUERY_NODE_FUNCTION == nodeType(pProj) && !fmIsScalarFunc(((SFunctionNode*)pProj)->funcId)) { if (QUERY_NODE_FUNCTION == nodeType(pProj) && !fmIsScalarFunc(((SFunctionNode*)pProj)->funcId) &&
!fmIsPseudoColumnFunc(((SFunctionNode*)pProj)->funcId)) {
return false; return false;
} }
return NULL == ((SExprNode*)pProj)->pAssociation; return NULL == ((SExprNode*)pProj)->pAssociation;

View File

@ -1686,7 +1686,7 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
static bool sysTableFromVnode(const char* pTable) { static bool sysTableFromVnode(const char* pTable) {
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) || return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) || (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TAGS))); (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TAGS)));
} }
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); } static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
@ -5968,6 +5968,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
if (NULL == pReq->tagName) { if (NULL == pReq->tagName) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pReq->colId = pSchema->colId;
SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema); SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema);
if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt, true)) { if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt, true)) {
@ -6051,6 +6052,7 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
if (NULL == pReq->colName) { if (NULL == pReq->colName) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pReq->colId = pSchema->colId;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -6071,6 +6073,7 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
if (NULL == pReq->colName) { if (NULL == pReq->colName) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pReq->colId = pSchema->colId;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -20,6 +20,7 @@
typedef struct SLogicPlanContext { typedef struct SLogicPlanContext {
SPlanContext* pPlanCxt; SPlanContext* pPlanCxt;
SLogicNode* pCurrRoot; SLogicNode* pCurrRoot;
bool hasScan;
} SLogicPlanContext; } SLogicPlanContext;
typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**); typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**);
@ -161,6 +162,10 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
return SCAN_TYPE_STREAM; return SCAN_TYPE_STREAM;
} }
if (TSDB_SYSTEM_TABLE == tableType) {
return SCAN_TYPE_SYSTEM_TABLE;
}
if (NULL == pScanCols) { if (NULL == pScanCols) {
return NULL == pScanPseudoCols return NULL == pScanPseudoCols
? SCAN_TYPE_TABLE ? SCAN_TYPE_TABLE
@ -169,17 +174,6 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
: SCAN_TYPE_TABLE); : SCAN_TYPE_TABLE);
} }
if (TSDB_SYSTEM_TABLE == tableType) {
return SCAN_TYPE_SYSTEM_TABLE;
}
SNode* pCol = NULL;
FOREACH(pCol, pScanCols) {
if (COLUMN_TYPE_COLUMN == ((SColumnNode*)pCol)->colType) {
return SCAN_TYPE_TABLE;
}
}
return SCAN_TYPE_TABLE; return SCAN_TYPE_TABLE;
} }
@ -300,6 +294,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
nodesDestroyNode((SNode*)pScan); nodesDestroyNode((SNode*)pScan);
} }
pCxt->hasScan = true;
return code; return code;
} }
@ -1339,9 +1335,9 @@ static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); } static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
static void setLogicSubplanType(SLogicSubplan* pSubplan) { static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pSubplan->pNode)) { if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pSubplan->pNode)) {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->subplanType = hasScan ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
} else { } else {
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pSubplan->pNode; SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pSubplan->pNode;
pSubplan->subplanType = (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren) pSubplan->subplanType = (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren)
@ -1351,7 +1347,7 @@ static void setLogicSubplanType(SLogicSubplan* pSubplan) {
} }
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
SLogicPlanContext cxt = {.pPlanCxt = pCxt}; SLogicPlanContext cxt = {.pPlanCxt = pCxt, .pCurrRoot = NULL, .hasScan = false};
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) { if (NULL == pSubplan) {
@ -1364,7 +1360,7 @@ int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode); int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setLogicNodeParent(pSubplan->pNode); setLogicNodeParent(pSubplan->pNode);
setLogicSubplanType(pSubplan); setLogicSubplanType(cxt.hasScan, pSubplan);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -35,7 +35,8 @@ typedef struct SPhysiPlanContext {
int32_t errCode; int32_t errCode;
int16_t nextDataBlockId; int16_t nextDataBlockId;
SArray* pLocationHelper; SArray* pLocationHelper;
SArray* pExecNodeList; // SArray<SQueryNodeLoad> bool hasScan;
bool hasSysScan;
} SPhysiPlanContext; } SPhysiPlanContext;
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) { static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
@ -495,8 +496,6 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &node);
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
} }
@ -577,8 +576,6 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pSubplan->execNode.nodeId = MNODE_HANDLE; pSubplan->execNode.nodeId = MNODE_HANDLE;
pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet; pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
} }
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &node);
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES)) { if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES)) {
pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet; pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
} else { } else {
@ -586,6 +583,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
} }
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pCxt->hasSysScan = true;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
} }
@ -601,6 +599,7 @@ static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
pCxt->hasScan = true;
switch (pScanLogicNode->scanType) { switch (pScanLogicNode->scanType) {
case SCAN_TYPE_TAG: case SCAN_TYPE_TAG:
case SCAN_TYPE_BLOCK_INFO: case SCAN_TYPE_BLOCK_INFO:
@ -1806,23 +1805,31 @@ static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
} }
} }
static void setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
if (NULL == pExecNodeList) {
return;
}
if (pCxt->hasSysScan || !pCxt->hasScan) {
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
taosArrayPush(pExecNodeList, &node);
}
}
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) { int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
SPhysiPlanContext cxt = {.pPlanCxt = pCxt, SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
.errCode = TSDB_CODE_SUCCESS, .errCode = TSDB_CODE_SUCCESS,
.nextDataBlockId = 0, .nextDataBlockId = 0,
.pLocationHelper = taosArrayInit(32, POINTER_BYTES), .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
.pExecNodeList = pExecNodeList}; .hasScan = false,
.hasSysScan = false};
if (NULL == cxt.pLocationHelper) { if (NULL == cxt.pLocationHelper) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
taosArrayClear(pExecNodeList);
}
int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan); int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setExplainInfo(pCxt, *pPlan); setExplainInfo(pCxt, *pPlan);
setExecNodeList(&cxt, pExecNodeList);
} }
destoryPhysiPlanContext(&cxt); destoryPhysiPlanContext(&cxt);

View File

@ -192,6 +192,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
// utils -------------- // utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);

View File

@ -1298,6 +1298,12 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
return ret; return ret;
} }
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartHeartbeatTimer(pSyncNode);
return 0;
}
// utils -------------- // utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;

View File

@ -103,10 +103,9 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
// if pgno == 0 fetch new btree root leaf page // if pgno == 0 fetch new btree root leaf page
if (pgno == 0) { if (pgno == 0) {
// fetch page & insert into main db // fetch page & insert into main db
// allocate a new child page
SPage *pPage; SPage *pPage;
TXN txn; TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0); tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
pPager->inTran = 1; pPager->inTran = 1;
@ -118,8 +117,6 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
return -1; return -1;
} }
// TODO: Need to zero the page
ret = tdbPagerWrite(pPager, pPage); ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) { if (ret < 0) {
return -1; return -1;

View File

@ -473,12 +473,6 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
return -1; return -1;
} }
TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
SBtreeInitPageArg iArg;
iArg.pBt = pBt;
iArg.flags = 0;
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
// read pgno & the page from journal // read pgno & the page from journal
SPgno pgno; SPgno pgno;
@ -494,20 +488,6 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
return -1; return -1;
} }
/*
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &iArg, &txn);
if (ret < 0) {
return -1;
}
// write the page to db
ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) {
return -1;
}
tdbPCacheRelease(pPager->pCache, pPage, &txn);
*/
i64 offset = pPager->pageSize * (pgno - 1); i64 offset = pPager->pageSize * (pgno - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0); ASSERT(0);
@ -523,8 +503,6 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
tdbOsFSync(pPager->fd); tdbOsFSync(pPager->fd);
tdbTxnClose(&txn);
tdbOsFree(pageBuf); tdbOsFree(pageBuf);
tdbOsClose(jfd); tdbOsClose(jfd);

View File

@ -312,6 +312,7 @@
./test.sh -f tsim/valgrind/checkError3.sim ./test.sh -f tsim/valgrind/checkError3.sim
./test.sh -f tsim/valgrind/checkError4.sim ./test.sh -f tsim/valgrind/checkError4.sim
./test.sh -f tsim/valgrind/checkError5.sim ./test.sh -f tsim/valgrind/checkError5.sim
./test.sh -f tsim/valgrind/checkError6.sim
# --- vnode # --- vnode
# unsupport ./test.sh -f tsim/vnode/replica3_basic.sim # unsupport ./test.sh -f tsim/vnode/replica3_basic.sim

View File

@ -111,6 +111,15 @@ if $hasleader != 1 then
goto step2 goto step2
endi endi
# sql use db;
# sql create table stb (ts timestamp, c int) tags (t int);
# sql create table t0 using stb tags (0);
# sql insert into t0 values(now, 1);
# sql show db.stables;
# sql show db.tables;
# sql show db.vgroups;
return
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd" sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(101, "102") sql create table db.ctb using db.stb tags(101, "102")
sql insert into db.ctb values(now, 1, "2") sql insert into db.ctb values(now, 1, "2")

View File

@ -26,6 +26,7 @@ print =============== step2: create db
sql create database db sql create database db
sql use db sql use db
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd" sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
sql create table db.c1 using db.stb tags(101, 102, "103")
print =============== step3: alter stb print =============== step3: alter stb
sql_error alter table db.stb add column ts int sql_error alter table db.stb add column ts int
@ -42,9 +43,8 @@ sql alter table db.stb drop tag c1
sql alter table db.stb drop tag t5 sql alter table db.stb drop tag t5
sql alter table db.stb MODIFY tag t3 binary(32) sql alter table db.stb MODIFY tag t3 binary(32)
sql alter table db.stb rename tag t1 tx sql alter table db.stb rename tag t1 tx
sql alter table db.stb comment 'abcde' ; sql alter table db.stb comment 'abcde' ;
goto _OVER sql drop table db.stb
print =============== step4: alter tb print =============== step4: alter tb
sql create table tb (ts timestamp, a int) sql create table tb (ts timestamp, a int)
@ -66,6 +66,35 @@ sql alter table tb add column h binary(10)
sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from tb sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from tb
sql select * from tb order by ts desc sql select * from tb order by ts desc
print =============== step5: alter stb and insert data
sql create table stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
sql show db.stables
sql describe stb
sql_error alter table stb add column ts int
sql create table db.ctb using db.stb tags(101, 102, "103")
sql insert into db.ctb values(now, 1, "2")
sql show db.tables
sql select * from db.stb
sql select * from tb
sql alter table stb add column c3 int
sql describe stb
sql select * from db.stb
sql select * from tb
sql insert into db.ctb values(now+1s, 1, 2, 3)
sql select * from db.stb
sql alter table db.stb add column c4 bigint
sql select * from db.stb
sql insert into db.ctb values(now+2s, 1, 2, 3, 4)
sql alter table db.stb drop column c1
sql reset query cache
sql select * from tb
sql insert into db.ctb values(now+3s, 2, 3, 4)
sql select * from db.stb
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check

View File

@ -68,7 +68,7 @@ $null=
system_content sh/checkValgrind.sh -n dnode1 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ] print cmd return result ----> [ $system_content ]
if $system_content > 0 then if $system_content > 3 then
return -1 return -1
endi endi

View File

@ -320,7 +320,7 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
self.tmqCase1() self.tmqCase1()
# self.tmqCase2() self.tmqCase2()
self.tmqCase3() self.tmqCase3()
def stop(self): def stop(self):

View File

@ -175,7 +175,7 @@ SScript *simBuildScriptObj(char *fileName) {
SScript *simParseScript(char *fileName) { SScript *simParseScript(char *fileName) {
TdFilePtr pFile; TdFilePtr pFile;
int32_t tokenLen, lineNum = 0; int32_t tokenLen, lineNum = 0;
char *buffer = NULL, name[128], *token, *rest; char buffer[10*1024], name[128], *token, *rest;
SCommand *pCmd; SCommand *pCmd;
SScript *script; SScript *script;
@ -195,7 +195,7 @@ SScript *simParseScript(char *fileName) {
simResetParser(); simResetParser();
while (!taosEOFFile(pFile)) { while (!taosEOFFile(pFile)) {
if (taosGetLineFile(pFile, &buffer) == -1) continue; if (taosGetsFile(pFile, sizeof(buffer) - 1, buffer) == -1) continue;
lineNum++; lineNum++;
int32_t cmdlen = (int32_t)strlen(buffer); int32_t cmdlen = (int32_t)strlen(buffer);
@ -240,7 +240,6 @@ SScript *simParseScript(char *fileName) {
return NULL; return NULL;
} }
} }
if(buffer != NULL) taosMemoryFree(buffer);
taosCloseFile(&pFile); taosCloseFile(&pFile);
script = simBuildScriptObj(fileName); script = simBuildScriptObj(fileName);