Merge branch '3.0' into fix/TD-19312
This commit is contained in:
commit
d2e4b0b5f6
|
@ -1731,13 +1731,14 @@ typedef struct {
|
|||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
char sourceDB[TSDB_DB_FNAME_LEN];
|
||||
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
char* sql;
|
||||
char* ast;
|
||||
int8_t igExists;
|
||||
int8_t triggerType;
|
||||
int8_t igExpired;
|
||||
int8_t fillHistory; // process data inserted before creating stream
|
||||
int64_t maxDelay;
|
||||
int64_t watermark;
|
||||
int8_t igExpired;
|
||||
int32_t numOfTags;
|
||||
SArray* pTags; // array of SField
|
||||
} SCMCreateStreamReq;
|
||||
|
|
|
@ -36,6 +36,7 @@ typedef struct SStreamTask SStreamTask;
|
|||
enum {
|
||||
STREAM_STATUS__NORMAL = 0,
|
||||
STREAM_STATUS__STOP,
|
||||
STREAM_STATUS__INIT,
|
||||
STREAM_STATUS__FAILED,
|
||||
STREAM_STATUS__RECOVER,
|
||||
};
|
||||
|
@ -291,6 +292,9 @@ typedef struct SStreamTask {
|
|||
int64_t recoverSnapVer;
|
||||
int64_t startVer;
|
||||
|
||||
// fill history
|
||||
int8_t fillHistory;
|
||||
|
||||
// children info
|
||||
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
|
||||
int32_t nextCheckId;
|
||||
|
@ -534,7 +538,7 @@ typedef struct SStreamMeta {
|
|||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
|
||||
void streamMetaClose(SStreamMeta* streamMeta);
|
||||
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
// int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
#include "taosdef.h"
|
||||
#include "tarray.h"
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tscalablebf.h"
|
||||
|
||||
|
@ -24,6 +25,11 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SUpdateKey {
|
||||
int64_t tbUid;
|
||||
TSKEY ts;
|
||||
} SUpdateKey;
|
||||
|
||||
typedef struct SUpdateInfo {
|
||||
SArray *pTsBuckets;
|
||||
uint64_t numBuckets;
|
||||
|
@ -41,6 +47,7 @@ typedef struct SUpdateInfo {
|
|||
|
||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
||||
|
|
|
@ -50,6 +50,9 @@ uint64_t MurmurHash3_64(const char *key, uint32_t len);
|
|||
uint32_t taosIntHash_32(const char *key, uint32_t len);
|
||||
uint32_t taosIntHash_64(const char *key, uint32_t len);
|
||||
|
||||
uint32_t taosFastHash(const char *key, uint32_t len);
|
||||
uint32_t taosDJB2Hash(const char *key, uint32_t len);
|
||||
|
||||
_hash_fn_t taosGetDefaultHashFunction(int32_t type);
|
||||
_equal_fn_t taosGetDefaultEqualFunction(int32_t type);
|
||||
|
||||
|
|
|
@ -267,7 +267,7 @@ _OVER:
|
|||
|
||||
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SAlterVnodeReplicaReq alterReq = {0};
|
||||
if (tSerializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
|
||||
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
@ -283,6 +283,12 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
dInfo("vgId:%d, start to close vnode", vgId);
|
||||
SWrapperCfg wrapperCfg = {
|
||||
.dropped = pVnode->dropped,
|
||||
.vgId = pVnode->vgId,
|
||||
.vgVersion = pVnode->vgVersion,
|
||||
};
|
||||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||
vmCloseVnode(pMgmt, pVnode);
|
||||
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
|
@ -295,11 +301,22 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
dInfo("vgId:%d, start to open vnode", vgId);
|
||||
if (vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb) < 0) {
|
||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
||||
if (pImpl == NULL) {
|
||||
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
|
||||
dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (vnodeStart(pImpl) != 0) {
|
||||
dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, vnode config is altered", vgId);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -613,6 +613,7 @@ typedef struct {
|
|||
// config
|
||||
int8_t igExpired;
|
||||
int8_t trigger;
|
||||
int8_t fillHistory;
|
||||
int64_t triggerParam;
|
||||
int64_t watermark;
|
||||
// source and target
|
||||
|
|
|
@ -631,29 +631,18 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
|||
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
|
||||
|
||||
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
|
||||
#if 0
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
return terrno;
|
||||
#else
|
||||
pDb->cfg.buffer = pAlter->buffer;
|
||||
terrno = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
|
||||
#if 0
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
return terrno;
|
||||
#else
|
||||
pDb->cfg.pages = pAlter->pages;
|
||||
terrno = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
if (pAlter->pageSize > 0 && pAlter->pageSize != pDb->cfg.pageSize) {
|
||||
#if 1
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
return terrno;
|
||||
#else
|
||||
pDb->cfg.pageSize = pAlter->pageSize;
|
||||
terrno = 0;
|
||||
|
@ -710,13 +699,9 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
|||
}
|
||||
|
||||
if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) {
|
||||
#if 1
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
#else
|
||||
pDb->cfg.replications = pAlter->replications;
|
||||
pDb->vgVersion++;
|
||||
terrno = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
return terrno;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "mndConsumer.h"
|
||||
|
||||
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
|
@ -31,6 +32,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
|
||||
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||
|
||||
|
@ -74,10 +76,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
|
||||
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
|
@ -91,6 +95,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
|||
|
||||
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||
|
||||
|
@ -134,6 +139,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
|||
|
||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -239,6 +239,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pTask->fillHistory = pStream->fillHistory;
|
||||
mndAddTaskToTaskSet(tasks, pTask);
|
||||
|
||||
pTask->nodeId = pVgroup->vgId;
|
||||
|
@ -270,6 +271,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pTask->fillHistory = pStream->fillHistory;
|
||||
mndAddTaskToTaskSet(tasks, pTask);
|
||||
|
||||
ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId);
|
||||
|
@ -356,6 +358,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
pInnerTask->fillHistory = pStream->fillHistory;
|
||||
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
|
||||
|
||||
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
|
||||
|
@ -422,6 +425,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
pTask->fillHistory = pStream->fillHistory;
|
||||
mndAddTaskToTaskSet(taskSourceLevel, pTask);
|
||||
|
||||
pTask->triggerParam = 0;
|
||||
|
|
|
@ -143,8 +143,10 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
|||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, buf, tlen + 1);
|
||||
if (tDecodeSStreamObj(&decoder, pStream) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -280,6 +282,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
pObj->trigger = pCreate->triggerType;
|
||||
pObj->triggerParam = pCreate->maxDelay;
|
||||
pObj->watermark = pCreate->watermark;
|
||||
pObj->fillHistory = pCreate->fillHistory;
|
||||
|
||||
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
||||
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
||||
|
@ -686,7 +689,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way
|
||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
|
||||
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
||||
|
||||
// create stb for stream
|
||||
|
|
|
@ -323,7 +323,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||
action.pRaw = taosMemoryMalloc(dataLen);
|
||||
if (action.pRaw == NULL) goto _OVER;
|
||||
mTrace("raw:%p, is created", pData);
|
||||
mTrace("raw:%p, is created", action.pRaw);
|
||||
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
||||
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
||||
action.pRaw = NULL;
|
||||
|
|
|
@ -315,12 +315,14 @@ static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pV
|
|||
return pReq;
|
||||
}
|
||||
|
||||
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup,
|
||||
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
|
||||
int32_t *pContLen) {
|
||||
SAlterVnodeReplicaReq alterReq = {0};
|
||||
alterReq.vgId = pVgroup->vgId;
|
||||
alterReq.strict = pDb->cfg.strict;
|
||||
alterReq.replica = pVgroup->replica;
|
||||
SAlterVnodeReplicaReq alterReq = {
|
||||
alterReq.vgId = pVgroup->vgId,
|
||||
alterReq.strict = pDb->cfg.strict,
|
||||
alterReq.replica = pVgroup->replica,
|
||||
alterReq.selfIndex = -1,
|
||||
};
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SReplica *pReplica = &alterReq.replicas[v];
|
||||
|
@ -335,7 +337,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDnodeObj *pDnode, SDb
|
|||
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
|
||||
mndReleaseDnode(pMnode, pVgidDnode);
|
||||
|
||||
if (pDnode->id == pVgid->dnodeId) {
|
||||
if (dnodeId == pVgid->dnodeId) {
|
||||
alterReq.selfIndex = v;
|
||||
}
|
||||
}
|
||||
|
@ -1004,7 +1006,9 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) {
|
||||
int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { return 0; }
|
||||
|
||||
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
||||
|
@ -1014,7 +1018,31 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO
|
|||
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = msgType;
|
||||
action.msgType = TDMT_VND_ALTER_CONFIG;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
|
||||
if (pDnode == NULL) return -1;
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_ALTER_REPLICA;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
|
@ -1070,7 +1098,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
|||
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
|
||||
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, -1) != 0) return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||
|
||||
mInfo("vgId:%d, will remove 1 vnodes", pVgroup->vgId);
|
||||
|
@ -1078,7 +1106,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
|||
SVnodeGid del = newVg.vnodeGid[vnIndex];
|
||||
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
|
||||
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, -1) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||
|
||||
|
@ -1152,7 +1180,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
|
|||
pGid->syncState = TAOS_SYNC_STATE_ERROR;
|
||||
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, -1) != 0) return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
|
@ -1178,7 +1206,7 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
|
|||
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
|
||||
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
||||
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, -1) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||
|
||||
|
@ -1538,7 +1566,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
|||
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
||||
|
||||
if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, pVgroup, TDMT_VND_ALTER_CONFIG) != 0) return -1;
|
||||
if (mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup) != 0) return -1;
|
||||
if (mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
@ -1553,11 +1581,10 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
|||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
|
||||
return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||
}
|
||||
|
||||
if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
|
||||
} else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
|
||||
mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
|
||||
pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
|
||||
|
||||
|
@ -1567,7 +1594,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
|||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
|
||||
return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -1622,12 +1650,12 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
|||
if (newVg1.replica == 1) {
|
||||
if (mndAddVnodeToVgroup(pMnode, &newVg1, pArray) != 0) goto _OVER;
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, -1) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||
} else if (newVg1.replica == 3) {
|
||||
SVnodeGid del1 = {0};
|
||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, -1) != 0) goto _OVER;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||
} else {
|
||||
|
@ -1645,8 +1673,8 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
|||
memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
|
||||
memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
|
||||
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_HASHRANGE) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg2, TDMT_VND_ALTER_HASHRANGE) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
|
||||
|
||||
// adjust vgroup
|
||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
|
||||
|
|
|
@ -211,6 +211,7 @@ int32_t smaSyncCommit(SSma* pSma);
|
|||
int32_t smaSyncPostCommit(SSma* pSma);
|
||||
int32_t smaPreCommit(SSma* pSma);
|
||||
int32_t smaCommit(SSma* pSma);
|
||||
int32_t smaFinishCommit(SSma* pSma);
|
||||
int32_t smaPostCommit(SSma* pSma);
|
||||
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
||||
|
||||
|
|
|
@ -120,7 +120,7 @@ _err2:
|
|||
_err:
|
||||
taosMemoryFree(pCache);
|
||||
|
||||
metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
||||
metaError("vgId:%d, meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -78,38 +78,69 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); }
|
|||
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
|
||||
|
||||
/**
|
||||
* @brief set rsma trigger stat active
|
||||
* @brief prepare rsma1/2, and set rsma trigger stat active
|
||||
*
|
||||
* @param pSma
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t smaBegin(SSma *pSma) {
|
||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pSmaEnv) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
int32_t code = 0;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
|
||||
if ((code = tsdbBegin(VND_RSMA1(pVnode))) < 0) {
|
||||
smaError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
if ((code = tsdbBegin(VND_RSMA2(pVnode))) < 0) {
|
||||
smaError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// set trigger stat
|
||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pSmaEnv) {
|
||||
goto _exit;
|
||||
}
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
int8_t rsmaTriggerStat =
|
||||
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
|
||||
switch (rsmaTriggerStat) {
|
||||
case TASK_TRIGGER_STAT_PAUSED: {
|
||||
smaDebug("vgId:%d, rsma trigger stat from paused to active", SMA_VID(pSma));
|
||||
smaDebug("vgId:%d, rsma trigger stat from paused to active", TD_VID(pVnode));
|
||||
break;
|
||||
}
|
||||
case TASK_TRIGGER_STAT_INIT: {
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
||||
smaDebug("vgId:%d, rsma trigger stat from init to active", SMA_VID(pSma));
|
||||
smaDebug("vgId:%d, rsma trigger stat from init to active", TD_VID(pVnode));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
||||
smaError("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat);
|
||||
smaWarn("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", TD_VID(pVnode), rsmaTriggerStat);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_exit:
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t smaFinishCommit(SSma *pSma) {
|
||||
int32_t code = 0;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
|
||||
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
|
||||
smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) {
|
||||
smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
_exit:
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -309,15 +340,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
* 1) This is high cost task and should not put in asyncPreCommit originally.
|
||||
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
|
||||
*/
|
||||
nLoops = 0;
|
||||
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
|
||||
smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma),
|
||||
(void *)taosGetSelfPthreadId());
|
||||
nLoops = 0;
|
||||
|
@ -368,10 +390,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
||||
int32_t code = 0;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pSmaEnv) {
|
||||
goto _exit;
|
||||
}
|
||||
#if 0
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
|
||||
|
@ -380,10 +398,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
#endif
|
||||
if ((code = tsdbCommit(VND_RSMA0(pVnode))) < 0) {
|
||||
smaError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) {
|
||||
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
|
|
|
@ -213,12 +213,12 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
#endif
|
||||
|
||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||
tqDebug("vgId:%d tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType));
|
||||
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType));
|
||||
|
||||
if (msgType == TDMT_VND_SUBMIT) {
|
||||
// lock push mgr to avoid potential msg lost
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
tqDebug("vgId:%d push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
|
||||
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
|
||||
if (taosHashGetSize(pTq->pPushMgr) != 0) {
|
||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||
|
@ -242,11 +242,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
|
||||
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
|
||||
continue;
|
||||
}
|
||||
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
|
||||
tqDebug("vgId:%d push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
|
||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
|
||||
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
|
||||
continue;
|
||||
}
|
||||
|
@ -274,7 +274,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
pRsp->blockNum++;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
||||
tqDebug("vgId:%d, tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
||||
pRsp->blockNum);
|
||||
if (pRsp->blockNum > 0) {
|
||||
// set offset
|
||||
|
|
|
@ -52,13 +52,13 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** p
|
|||
goto _err;
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
||||
tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
||||
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -113,14 +113,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
|||
pHdr->size = vLen;
|
||||
memcpy(pHdr->data, pVal, vLen);
|
||||
|
||||
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||
handle.snapshotVer, handle.subKey, vLen);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -154,7 +154,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -204,6 +204,6 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
|||
|
||||
_err:
|
||||
tDecoderClear(pDecoder);
|
||||
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -52,13 +52,13 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** p
|
|||
goto _err;
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
||||
tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
||||
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -113,14 +113,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
|||
pHdr->size = vLen;
|
||||
memcpy(pHdr->data, pVal, vLen);
|
||||
|
||||
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||
handle.snapshotVer, handle.subKey, vLen);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -154,7 +154,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -204,6 +204,6 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
|||
|
||||
_err:
|
||||
tDecoderClear(pDecoder);
|
||||
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -145,7 +145,7 @@ int32_t tsdbBegin(STsdb *pTsdb) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
|
|||
_exit:
|
||||
if (code) {
|
||||
tsdbEndCommit(&commith, code);
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -479,7 +479,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -567,7 +567,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -619,7 +619,7 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -662,7 +662,7 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -702,7 +702,7 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -738,7 +738,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -764,7 +764,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -791,7 +791,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbDataFReaderClose(&pCommitter->dReader.pReader);
|
||||
tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
|
||||
}
|
||||
|
@ -829,7 +829,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -886,7 +886,7 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -945,7 +945,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1031,9 +1031,9 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
|
||||
tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1056,9 +1056,9 @@ _exit:
|
|||
taosArrayDestroy(pCommitter->aTbDataP);
|
||||
pCommitter->aTbDataP = NULL;
|
||||
if (code || eno) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1150,7 +1150,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1198,7 +1198,7 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1285,7 +1285,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1360,7 +1360,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1409,7 +1409,7 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id)
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1453,7 +1453,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1557,7 +1557,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1624,7 +1624,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
|
@ -1654,9 +1654,9 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb finish commit", TD_VID(pTsdb->pVnode));
|
||||
tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1670,9 +1670,9 @@ int32_t tsdbRollbackCommit(STsdb *pTsdb) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb rollback commit", TD_VID(pTsdb->pVnode));
|
||||
tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
|
@ -253,7 +253,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
|
||||
fid);
|
||||
}
|
||||
return code;
|
||||
|
@ -428,7 +428,7 @@ static int32_t tsdbNewFileSet(STsdb *pTsdb, SDFileSet *pSetTo, SDFileSet *pSetFr
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -598,7 +598,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -694,7 +694,7 @@ static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -731,7 +731,7 @@ int32_t tsdbFSCommit(STsdb *pTsdb) {
|
|||
_exit:
|
||||
tsdbFSDestroy(&fs);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -746,7 +746,7 @@ int32_t tsdbFSRollback(STsdb *pTsdb) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -792,7 +792,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -903,7 +903,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1023,7 +1023,7 @@ int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -536,6 +536,10 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTre
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
|
||||
return -1 * tLDataIterCmprFn(p1, p2);
|
||||
}
|
||||
|
||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
bool destroyLoadInfo, const char *idStr) {
|
||||
|
@ -547,8 +551,11 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
|||
}
|
||||
|
||||
pMTree->idStr = idStr;
|
||||
|
||||
if (!pMTree->backward) { // asc
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||
} else { // desc
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
|
||||
}
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pMTree->pLoadInfo = pBlockLoadInfo;
|
||||
|
|
|
@ -415,7 +415,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
|
|||
pHeadFile->size += size;
|
||||
|
||||
_exit:
|
||||
// tsdbTrace("vgId:%d write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d",
|
||||
// tsdbTrace("vgId:%d, write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d",
|
||||
// TD_VID(pWriter->pTsdb->pVnode),
|
||||
// pHeadFile->offset, size, taosArrayGetSize(aBlockIdx));
|
||||
return code;
|
||||
|
@ -498,12 +498,12 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk) {
|
|||
pSttFile->size += size;
|
||||
|
||||
_exit:
|
||||
tsdbTrace("vgId:%d tsdb write stt block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
|
||||
tsdbTrace("vgId:%d, tsdb write stt block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
|
||||
pSttFile->offset, size);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -539,7 +539,7 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -601,13 +601,13 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
|||
}
|
||||
|
||||
_exit:
|
||||
tsdbTrace("vgId:%d tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d",
|
||||
tsdbTrace("vgId:%d, tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d",
|
||||
TD_VID(pWriter->pTsdb->pVnode), pBlockData->suid, pBlockData->uid, pBlockData->nRow, pBlkInfo->offset,
|
||||
pBlkInfo->szBlock);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -732,7 +732,7 @@ int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBl
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -882,7 +882,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
_exit:
|
||||
if (code) {
|
||||
*ppReader = NULL;
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
|
||||
if (pReader) {
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
|
||||
|
@ -995,7 +995,7 @@ int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d read stt blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, read stt blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1058,7 +1058,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1177,7 +1177,7 @@ _exit:
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1200,7 +1200,7 @@ int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockDat
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1258,7 +1258,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d, tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1271,7 +1271,7 @@ int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk,
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1294,7 +1294,7 @@ int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1338,7 +1338,7 @@ _exit:
|
|||
taosMemoryFree(pDelFWriter);
|
||||
}
|
||||
*ppWriter = NULL;
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
|
||||
} else {
|
||||
*ppWriter = pDelFWriter;
|
||||
}
|
||||
|
@ -1502,7 +1502,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
|
|||
_exit:
|
||||
if (code) {
|
||||
*ppReader = NULL;
|
||||
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
*ppReader = pDelFReader;
|
||||
}
|
||||
|
|
|
@ -174,7 +174,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode),
|
||||
tsdbError("vgId:%d, vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode),
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
@ -517,7 +517,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), __func__, lino,
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code), pTsdb->path);
|
||||
*ppReader = NULL;
|
||||
|
||||
|
@ -738,7 +738,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -991,7 +991,7 @@ _exit:
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1072,7 +1072,7 @@ _exit:
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1338,7 +1338,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
|
||||
if (pWriter) {
|
||||
|
|
|
@ -174,12 +174,12 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
|
|||
if (pPool->node.size != size) {
|
||||
SVBufPool *pPoolT = NULL;
|
||||
if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) {
|
||||
vWarn("vgId:%d try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode),
|
||||
vWarn("vgId:%d, try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode),
|
||||
pPool->node.size, size, tstrerror(errno));
|
||||
} else {
|
||||
vnodeBufPoolDestroy(pPool);
|
||||
pPool = pPoolT;
|
||||
vDebug("vgId:%d change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size);
|
||||
vDebug("vgId:%d, change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,20 +51,11 @@ int vnodeBegin(SVnode *pVnode) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pVnode->pSma) {
|
||||
if (VND_RSMA1(pVnode) && tsdbBegin(VND_RSMA1(pVnode)) < 0) {
|
||||
vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (VND_RSMA2(pVnode) && tsdbBegin(VND_RSMA2(pVnode)) < 0) {
|
||||
vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// begin sma
|
||||
smaBegin(pVnode->pSma); // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged
|
||||
if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) {
|
||||
vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -239,10 +230,8 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
}
|
||||
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
||||
|
||||
if (smaPreCommit(pVnode->pSma) < 0) {
|
||||
vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
code = smaPreCommit(pVnode->pSma);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
vnodeBufPoolUnRef(pVnode->inUse);
|
||||
pVnode->inUse = NULL;
|
||||
|
@ -253,14 +242,12 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
if (smaCommit(pVnode->pSma) < 0) {
|
||||
vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
code = tsdbCommit(pVnode->pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
code = smaCommit(pVnode->pSma);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (tqCommit(pVnode->pTq) < 0) {
|
||||
|
@ -274,7 +261,13 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
tsdbFinishCommit(pVnode->pTsdb);
|
||||
code = tsdbFinishCommit(pVnode->pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
code = smaFinishCommit(pVnode->pSma);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (metaFinishCommit(pVnode->pMeta) < 0) {
|
||||
code = terrno;
|
||||
|
@ -293,7 +286,7 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
vInfo("vgId:%d, commit end", TD_VID(pVnode));
|
||||
}
|
||||
|
|
|
@ -219,7 +219,7 @@ _exit:
|
|||
return code;
|
||||
|
||||
_err:
|
||||
vError("vgId:%d vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
|
||||
vError("vgId:%d, vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1089,18 +1089,18 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
}
|
||||
|
||||
if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
|
||||
vInfo("vgId:%d vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
|
||||
vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
|
||||
(uint64_t)(req.buffer * 1024LL * 1024LL));
|
||||
pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
|
||||
}
|
||||
|
||||
if (pVnode->config.szCache != req.pages) {
|
||||
if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
|
||||
vError("vgId:%d failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
|
||||
vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
|
||||
pVnode->config.szCache, req.pages, tstrerror(errno));
|
||||
return errno;
|
||||
} else {
|
||||
vInfo("vgId:%d vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
|
||||
vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
|
||||
pVnode->config.szCache = req.pages;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,6 +162,8 @@ typedef struct {
|
|||
SQueryTableDataCond tableCond;
|
||||
int64_t recoverStartVer;
|
||||
int64_t recoverEndVer;
|
||||
int64_t fillHistoryVer1;
|
||||
int64_t fillHistoryVer2;
|
||||
SStreamState* pState;
|
||||
} SStreamTaskInfo;
|
||||
|
||||
|
|
|
@ -569,8 +569,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pResult != pSrcBlock) {
|
||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||
memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
|
||||
// if the source equals to the destination, it is to create a new column as the result of scalar
|
||||
// function or some operators.
|
||||
|
@ -3036,7 +3038,8 @@ void cleanupExprSupp(SExprSupp* pSupp) {
|
|||
taosMemoryFree(pSupp->rowEntryInfoOffset);
|
||||
}
|
||||
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -4283,6 +4286,10 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
|
|||
void* pVal = NULL;
|
||||
int32_t code = streamStateSessionGet(pState, pKey, &pVal, &size);
|
||||
ASSERT(code == 0);
|
||||
if (code == -1) {
|
||||
// coverity scan
|
||||
continue;
|
||||
}
|
||||
SResultRow* pRow = (SResultRow*)pVal;
|
||||
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
||||
// no results, continue to check the next one
|
||||
|
|
|
@ -284,7 +284,6 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||
|
@ -743,7 +742,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
|
||||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
int32_t numOfCols = 0;
|
||||
pInfo->pColMatchInfo = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
pInfo->pColMatchInfo =
|
||||
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1713,9 +1713,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
|
||||
pTSInfo->cond.startVersion = -1;
|
||||
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
||||
pTSInfo->scanTimes = 0;
|
||||
pTSInfo->currentGroupId = -1;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
|
||||
|
@ -1724,12 +1727,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
|
||||
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
|
||||
if (pBlock != NULL) {
|
||||
calBlockTbName(&pInfo->tbnameCalSup, pBlock);
|
||||
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
|
||||
return pBlock;
|
||||
}
|
||||
// TODO fill in bloom filter
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
// TODO: refactor
|
||||
|
@ -2115,7 +2120,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
|||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
|
||||
return pOperator;
|
||||
|
||||
_end:
|
||||
_end:
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -3149,6 +3149,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
TSKEY maxTs = INT64_MIN;
|
||||
|
@ -3191,6 +3192,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
} else {
|
||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
|
||||
&pInfo->interval, &pInfo->delKey);
|
||||
streamStateCommit(pTaskInfo->streamInfo.pState);
|
||||
}
|
||||
return NULL;
|
||||
} else {
|
||||
|
@ -3986,7 +3988,8 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa
|
|||
return code;
|
||||
}
|
||||
}
|
||||
tSimpleHashIterateRemove(pHashMap, &pWinInfo->sessionWin, sizeof(SSessionKey), &pIte, &iter);
|
||||
SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
|
||||
tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4006,7 +4009,7 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
|
|||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||
SResultWindowInfo* pWinInfo = *(void**)pIte;
|
||||
SResultWindowInfo* pWinInfo = pIte;
|
||||
saveResult(*pWinInfo, pStUpdated);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4584,6 +4587,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
SSessionKey key = curWin.winInfo.sessionWin;
|
||||
key.win.ekey = key.win.skey;
|
||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4974,8 +4983,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
|
||||
|
||||
SInterval interval = {.interval = pNode->interval,
|
||||
.sliding = pNode->sliding,
|
||||
.intervalUnit = pNode->intervalUnit,
|
||||
|
@ -5382,6 +5389,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
|
||||
&pInfo->delKey);
|
||||
doSetOperatorCompleted(pOperator);
|
||||
streamStateCommit(pTaskInfo->streamInfo.pState);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -920,7 +920,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
|||
code = doSetupUdf(udfName, pHandle);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SUdfcFuncStub stub = {0};
|
||||
strcpy(stub.udfName, udfName);
|
||||
strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||
stub.handle = *pHandle;
|
||||
++stub.refCount;
|
||||
stub.lastRefTime = taosGetTimestampUs();
|
||||
|
|
|
@ -455,8 +455,13 @@ static int32_t collectMetaKeyFromShowLicence(SCollectMetaKeyCxt* pCxt, SShowStmt
|
|||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowVgroups(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VGROUPS,
|
||||
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VGROUPS,
|
||||
pCxt->pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// just to verify whether the database exists
|
||||
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, pCxt->pMetaCache);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowTopics(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
|
|
|
@ -6212,6 +6212,20 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t checkShowVgroups(STranslateContext* pCxt, SShowStmt* pShow) {
|
||||
// just to verify whether the database exists
|
||||
SDbCfgInfo dbCfg = {0};
|
||||
return getDBCfg(pCxt, ((SValueNode*)pShow->pDbName)->literal, &dbCfg);
|
||||
}
|
||||
|
||||
static int32_t rewriteShowVgroups(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
int32_t code = checkShowVgroups(pCxt, (SShowStmt*)pQuery->pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteShow(pCxt, pQuery);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static SNode* createTagsFunction() {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
|
@ -7358,7 +7372,6 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
case QUERY_NODE_SHOW_STABLES_STMT:
|
||||
case QUERY_NODE_SHOW_USERS_STMT:
|
||||
case QUERY_NODE_SHOW_DNODES_STMT:
|
||||
case QUERY_NODE_SHOW_VGROUPS_STMT:
|
||||
case QUERY_NODE_SHOW_MNODES_STMT:
|
||||
case QUERY_NODE_SHOW_MODULES_STMT:
|
||||
case QUERY_NODE_SHOW_QNODES_STMT:
|
||||
|
@ -7378,6 +7391,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
case QUERY_NODE_SHOW_TAGS_STMT:
|
||||
code = rewriteShow(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_VGROUPS_STMT:
|
||||
code = rewriteShowVgroups(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_TABLE_TAGS_STMT:
|
||||
code = rewriteShowStableTags(pCxt, pQuery);
|
||||
break;
|
||||
|
|
|
@ -94,7 +94,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
|
|||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
||||
ASSERT(0);
|
||||
tDecoderClear(&decoder);
|
||||
goto FAIL;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -113,6 +113,13 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
|
|||
ASSERT(0);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
if (pTask->fillHistory) {
|
||||
// pipeline exec
|
||||
// if finished, dispatch a stream-prepare-finished msg to downstream task
|
||||
// set status normal
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
|
@ -120,6 +127,7 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
void* buf = NULL;
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||
|
@ -149,6 +157,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
|
|
|
@ -656,6 +656,7 @@ int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, S
|
|||
streamStateCurPrev(pState, pCur);
|
||||
}
|
||||
*curKey = resKey;
|
||||
streamStateFreeCur(pCur);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
|
|||
}
|
||||
|
||||
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||
/*if (tStartEncode(pEncoder) < 0) return -1;*/
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
|
||||
|
@ -64,6 +64,10 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;
|
||||
|
||||
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
|
||||
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
|
@ -93,12 +97,12 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
}
|
||||
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
|
||||
|
||||
/*tEndEncode(pEncoder);*/
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||
/*if (tStartDecode(pDecoder) < 0) return -1;*/
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
|
||||
|
@ -113,6 +117,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;
|
||||
|
||||
int32_t epSz;
|
||||
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
||||
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
|
||||
|
@ -150,7 +158,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
|
||||
|
||||
/*tEndDecode(pDecoder);*/
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "query.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tencode.h"
|
||||
#include "tstreamUpdate.h"
|
||||
#include "ttime.h"
|
||||
|
@ -162,15 +163,42 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
|
|||
return false;
|
||||
}
|
||||
|
||||
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
TSKEY maxTs = -1;
|
||||
int64_t tbUid = pBlock->info.uid;
|
||||
|
||||
SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i];
|
||||
maxTs = TMAX(maxTs, ts);
|
||||
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||
if (pSBf) {
|
||||
tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
||||
}
|
||||
}
|
||||
TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
|
||||
if (pMaxTs == NULL || *pMaxTs > tbUid) {
|
||||
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
|
||||
}
|
||||
}
|
||||
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||
int32_t res = TSDB_CODE_FAILED;
|
||||
|
||||
SUpdateKey updateKey = {
|
||||
.tbUid = tableId,
|
||||
.ts = ts,
|
||||
};
|
||||
|
||||
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||
if (ts < maxTs - pInfo->watermark) {
|
||||
// this window has been closed.
|
||||
if (pInfo->pCloseWinSBF) {
|
||||
res = tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY));
|
||||
res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey));
|
||||
if (res == TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
} else {
|
||||
|
@ -183,7 +211,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
|||
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||
// pSBf may be a null pointer
|
||||
if (pSBf) {
|
||||
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
||||
res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
|
||||
}
|
||||
|
||||
int32_t size = taosHashGetSize(pInfo->pMap);
|
||||
|
|
|
@ -18,3 +18,8 @@ TARGET_INCLUDE_DIRECTORIES(
|
|||
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
|
||||
)
|
||||
|
||||
add_test(
|
||||
NAME streamUpdateTest
|
||||
COMMAND streamUpdateTest
|
||||
)
|
|
@ -234,7 +234,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
|
|||
cli->addr = tstrdup(server);
|
||||
cli->port = port;
|
||||
|
||||
uv_loop_t* loop = uv_default_loop();
|
||||
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||
int err = uv_loop_init(loop);
|
||||
if (err != 0) {
|
||||
uError("http-report failed to init uv_loop, reason: %s", uv_strerror(err));
|
||||
taosMemoryFree(loop);
|
||||
terrno = TAOS_SYSTEM_ERROR(err);
|
||||
destroyHttpClient(cli);
|
||||
return terrno;
|
||||
}
|
||||
uv_tcp_init(loop, &cli->tcp);
|
||||
// set up timeout to avoid stuck;
|
||||
int32_t fd = taosCreateSocketWithTimeout(5);
|
||||
|
@ -258,5 +266,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
|
|||
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
uv_loop_close(loop);
|
||||
taosMemoryFree(loop);
|
||||
return terrno;
|
||||
}
|
||||
|
|
|
@ -528,7 +528,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
idxEntry.offset, fLogNameStr);
|
||||
goto _err;
|
||||
}
|
||||
wWarn("vgId:%d wal idx append new entry %" PRId64 " %" PRId64, pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
|
||||
wWarn("vgId:%d, wal idx append new entry %" PRId64 " %" PRId64, pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
|
||||
if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
|
||||
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
|
||||
goto _err;
|
||||
|
@ -812,7 +812,7 @@ int walLoadMeta(SWal* pWal) {
|
|||
// find existing meta file
|
||||
int metaVer = walFindCurMetaVer(pWal);
|
||||
if (metaVer == -1) {
|
||||
wDebug("vgId:%d wal find meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
wDebug("vgId:%d, wal find meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
return -1;
|
||||
}
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
|
@ -822,7 +822,7 @@ int walLoadMeta(SWal* pWal) {
|
|||
taosStatFile(fnameStr, &fileSize, NULL);
|
||||
if (fileSize == 0) {
|
||||
taosRemoveFile(fnameStr);
|
||||
wDebug("vgId:%d wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
return -1;
|
||||
}
|
||||
int size = (int)fileSize;
|
||||
|
|
|
@ -138,12 +138,12 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
|||
(void)walLoadMeta(pWal);
|
||||
|
||||
if (walCheckAndRepairMeta(pWal) < 0) {
|
||||
wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId);
|
||||
wError("vgId:%d, cannot open wal since repair meta file failed", pWal->cfg.vgId);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (walCheckAndRepairIdx(pWal) < 0) {
|
||||
wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId);
|
||||
wError("vgId:%d, cannot open wal since repair idx file failed", pWal->cfg.vgId);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,20 +44,20 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
|||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
||||
wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
|
||||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
||||
wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
}
|
||||
}
|
||||
walRemoveMeta(pWal);
|
||||
|
|
|
@ -57,8 +57,10 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
|
|||
|
||||
// ln(2) = 0.693147180559945
|
||||
pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945);
|
||||
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
||||
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);
|
||||
/*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
|
||||
/*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
|
||||
pBF->hashFn1 = taosFastHash;
|
||||
pBF->hashFn2 = taosDJB2Hash;
|
||||
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
||||
if (pBF->buffer == NULL) {
|
||||
tBloomFilterDestroy(pBF);
|
||||
|
|
|
@ -32,6 +32,23 @@
|
|||
(h) ^= (h) >> 16; \
|
||||
} while (0)
|
||||
|
||||
uint32_t taosFastHash(const char *key, uint32_t len) {
|
||||
uint32_t result = 0x55555555;
|
||||
for (uint32_t i = 0; i < len; i++) {
|
||||
result ^= (uint8_t)key[i];
|
||||
result = ROTL32(result, 5);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
uint32_t taosDJB2Hash(const char *key, uint32_t len) {
|
||||
uint32_t hash = 5381;
|
||||
for (uint32_t i = 0; i < len; i++) {
|
||||
hash = ((hash << 5) + hash) + (uint8_t)key[i]; /* hash * 33 + c */
|
||||
}
|
||||
return hash;
|
||||
}
|
||||
|
||||
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
|
||||
const uint8_t *data = (const uint8_t *)key;
|
||||
const int32_t nblocks = len >> 2u;
|
||||
|
|
|
@ -32,6 +32,26 @@ sql alter dnode 1 'rpcDebugFlag 131'
|
|||
sql alter dnode 1 'qDebugFlag 131'
|
||||
sql alter dnode 1 'metaDebugFlag 131'
|
||||
|
||||
sql alter dnode 1 'debugFlag' '135'
|
||||
sql alter dnode 1 'dDebugFlag' '131'
|
||||
sql alter dnode 1 'vDebugFlag' '131'
|
||||
sql alter dnode 1 'mDebugFlag' '131'
|
||||
sql alter dnode 1 'wDebugFlag' '131'
|
||||
sql alter dnode 1 'sDebugFlag' '131'
|
||||
sql alter dnode 1 'tsdbDebugFlag' '131'
|
||||
sql alter dnode 1 'tqDebugFlag' '131'
|
||||
sql alter dnode 1 'fsDebugFlag' '131'
|
||||
sql alter dnode 1 'udfDebugFlag' '131'
|
||||
sql alter dnode 1 'smaDebugFlag' '131'
|
||||
sql alter dnode 1 'idxDebugFlag' '131'
|
||||
sql alter dnode 1 'tdbDebugFlag' '131'
|
||||
sql alter dnode 1 'tmrDebugFlag' '131'
|
||||
sql alter dnode 1 'uDebugFlag' '131'
|
||||
sql alter dnode 1 'smaDebugFlag' '131'
|
||||
sql alter dnode 1 'rpcDebugFlag' '131'
|
||||
sql alter dnode 1 'qDebugFlag' '131'
|
||||
sql alter dnode 1 'metaDebugFlag' '131'
|
||||
|
||||
sql_error alter dnode 2 'wDebugFlag 135'
|
||||
sql_error alter dnode 2 'tmrDebugFlag 135'
|
||||
sql_error alter dnode 1 'monDebugFlag 131'
|
||||
|
@ -39,6 +59,13 @@ sql_error alter dnode 1 'cqDebugFlag 131'
|
|||
sql_error alter dnode 1 'httpDebugFlag 131'
|
||||
sql_error alter dnode 1 'mqttDebugFlag 131'
|
||||
|
||||
sql_error alter dnode 2 'wDebugFlag' '135'
|
||||
sql_error alter dnode 2 'tmrDebugFlag' '135'
|
||||
sql_error alter dnode 1 'monDebugFlag' '131'
|
||||
sql_error alter dnode 1 'cqDebugFlag' '131'
|
||||
sql_error alter dnode 1 'httpDebugFlag' '131'
|
||||
sql_error alter dnode 1 'mqttDebugFlag' '131'
|
||||
|
||||
print ======== step3
|
||||
sql_error alter $hostname1 debugFlag 135
|
||||
sql_error alter $hostname1 monDebugFlag 135
|
||||
|
|
|
@ -134,6 +134,7 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
return
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
|
|
|
@ -5,7 +5,7 @@ sleep 50
|
|||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 1
|
||||
sql create database test vgroups 1;
|
||||
sql select * from information_schema.ins_databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
|
|
|
@ -214,7 +214,7 @@ class TDTestCase:
|
|||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
actConsumeTotalRows = resultList[0]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted))
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
|
|
|
@ -30,6 +30,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos);
|
|||
void shellGetPrevCharSize(const char* str, int32_t pos, int32_t* size, int32_t* width);
|
||||
void shellShowOnScreen(SShellCmd* cmd);
|
||||
void shellInsertChar(SShellCmd* cmd, char* c, int size);
|
||||
void shellInsertStr(SShellCmd* cmd, char* str, int size);
|
||||
bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* p, int32_t len);
|
||||
|
||||
typedef struct SAutoPtr {
|
||||
|
@ -1099,7 +1100,7 @@ void printScreen(TAOS* con, SShellCmd* cmd, SWords* match) {
|
|||
}
|
||||
|
||||
// insert new
|
||||
shellInsertChar(cmd, (char*)str, strLen);
|
||||
shellInsertStr(cmd, (char*)str, strLen);
|
||||
}
|
||||
|
||||
// main key press tab , matched return true else false
|
||||
|
@ -1220,7 +1221,7 @@ bool fillWithType(TAOS* con, SShellCmd* cmd, char* pre, int type) {
|
|||
|
||||
// show
|
||||
int count = strlen(part);
|
||||
shellInsertChar(cmd, part, count);
|
||||
shellInsertStr(cmd, part, count);
|
||||
cntDel = count; // next press tab delete current append count
|
||||
|
||||
taosMemoryFree(str);
|
||||
|
@ -1247,7 +1248,7 @@ bool fillTableName(TAOS* con, SShellCmd* cmd, char* pre) {
|
|||
|
||||
// show
|
||||
int count = strlen(part);
|
||||
shellInsertChar(cmd, part, count);
|
||||
shellInsertStr(cmd, part, count);
|
||||
cntDel = count; // next press tab delete current append count
|
||||
|
||||
taosMemoryFree(str);
|
||||
|
@ -1371,7 +1372,7 @@ bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* sql, int32_t len) {
|
|||
bool fieldEnd = fieldsInputEnd(p);
|
||||
// check fields input end then insert from keyword
|
||||
if (fieldEnd && p[len - 1] == ' ') {
|
||||
shellInsertChar(cmd, "from", 4);
|
||||
shellInsertStr(cmd, "from", 4);
|
||||
taosMemoryFree(p);
|
||||
return true;
|
||||
}
|
||||
|
@ -1569,7 +1570,7 @@ bool matchOther(TAOS* con, SShellCmd* cmd) {
|
|||
if (p[len - 1] == '\\') {
|
||||
// append '\G'
|
||||
char a[] = "G;";
|
||||
shellInsertChar(cmd, a, 2);
|
||||
shellInsertStr(cmd, a, 2);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos);
|
|||
void shellShowOnScreen(SShellCmd *cmd);
|
||||
void shellGetPrevCharSize(const char *str, int32_t pos, int32_t *size, int32_t *width);
|
||||
void shellInsertChar(SShellCmd *cmd, char *c, int size);
|
||||
void shellInsertString(SShellCmd *cmd, char *str, int size);
|
||||
|
||||
int32_t shellCountPrefixOnes(uint8_t c) {
|
||||
uint8_t mask = 127;
|
||||
|
@ -101,11 +102,30 @@ void shellInsertChar(SShellCmd *cmd, char *c, int32_t size) {
|
|||
/* update the values */
|
||||
cmd->commandSize += size;
|
||||
cmd->cursorOffset += size;
|
||||
for (int i = 0; i < size; i++) {
|
||||
taosMbToWchar(&wc, c + i, size);
|
||||
cmd->screenOffset += taosWcharWidth(wc);
|
||||
cmd->endOffset += taosWcharWidth(wc);
|
||||
}
|
||||
|
||||
// set string end
|
||||
cmd->command[cmd->commandSize] = 0;
|
||||
#ifdef WINDOWS
|
||||
#else
|
||||
shellShowOnScreen(cmd);
|
||||
#endif
|
||||
}
|
||||
|
||||
// insert string . count is str char count
|
||||
void shellInsertStr(SShellCmd *cmd, char *str, int32_t size) {
|
||||
shellClearScreen(cmd->endOffset + PSIZE, cmd->screenOffset + PSIZE);
|
||||
/* update the buffer */
|
||||
memmove(cmd->command + cmd->cursorOffset + size, cmd->command + cmd->cursorOffset,
|
||||
cmd->commandSize - cmd->cursorOffset);
|
||||
memcpy(cmd->command + cmd->cursorOffset, str, size);
|
||||
/* update the values */
|
||||
cmd->commandSize += size;
|
||||
cmd->cursorOffset += size;
|
||||
cmd->screenOffset += size;
|
||||
cmd->endOffset += size;
|
||||
|
||||
// set string end
|
||||
cmd->command[cmd->commandSize] = 0;
|
||||
#ifdef WINDOWS
|
||||
|
@ -480,9 +500,11 @@ int32_t shellReadCommand(char *command) {
|
|||
}
|
||||
shellInsertChar(&cmd, utf8_array, count);
|
||||
pressOtherKey(c);
|
||||
#ifndef WINDOWS
|
||||
} else if (c == TAB_KEY) {
|
||||
// press TAB key
|
||||
pressTabKey(&cmd);
|
||||
#endif
|
||||
} else if (c < '\033') {
|
||||
pressOtherKey(c);
|
||||
// Ctrl keys. TODO: Implement ctrl combinations
|
||||
|
|
|
@ -282,9 +282,14 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
|||
|
||||
void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision) {
|
||||
if (val == NULL) {
|
||||
taosFprintfFile(pFile, "NULL");
|
||||
return;
|
||||
}
|
||||
|
||||
char quotationStr[2];
|
||||
quotationStr[0] = '\"';
|
||||
quotationStr[1] = 0;
|
||||
|
||||
int n;
|
||||
char buf[TSDB_MAX_BYTES_PER_ROW];
|
||||
switch (field->type) {
|
||||
|
@ -330,33 +335,23 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
|
|||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
{
|
||||
char quotationStr[2];
|
||||
int32_t bufIndex = 0;
|
||||
quotationStr[0] = 0;
|
||||
quotationStr[1] = 0;
|
||||
for (int32_t i = 0; i < length; i++) {
|
||||
buf[bufIndex] = val[i];
|
||||
bufIndex++;
|
||||
if (val[i] == '\"') {
|
||||
buf[bufIndex] = val[i];
|
||||
bufIndex++;
|
||||
quotationStr[0] = '\"';
|
||||
}
|
||||
if (val[i] == ',') {
|
||||
quotationStr[0] = '\"';
|
||||
}
|
||||
}
|
||||
buf[bufIndex] = 0;
|
||||
if (length == 0) {
|
||||
quotationStr[0] = '\"';
|
||||
}
|
||||
|
||||
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
shellFormatTimestamp(buf, *(int64_t *)val, precision);
|
||||
taosFprintfFile(pFile, "%s", buf);
|
||||
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -497,9 +497,14 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
|
|||
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
|
||||
int32_t precision) {
|
||||
if (val == NULL) {
|
||||
taosFprintfFile(pFile, "NULL");
|
||||
return;
|
||||
}
|
||||
|
||||
char quotationStr[2];
|
||||
quotationStr[0] = '\"';
|
||||
quotationStr[1] = 0;
|
||||
|
||||
int n;
|
||||
char buf[TSDB_MAX_BYTES_PER_ROW];
|
||||
switch (field->type) {
|
||||
|
@ -545,33 +550,23 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f
|
|||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
{
|
||||
char quotationStr[2];
|
||||
int32_t bufIndex = 0;
|
||||
quotationStr[0] = 0;
|
||||
quotationStr[1] = 0;
|
||||
for (int32_t i = 0; i < length; i++) {
|
||||
buf[bufIndex] = val[i];
|
||||
bufIndex++;
|
||||
if (val[i] == '\"') {
|
||||
buf[bufIndex] = val[i];
|
||||
bufIndex++;
|
||||
quotationStr[0] = '\"';
|
||||
}
|
||||
if (val[i] == ',') {
|
||||
quotationStr[0] = '\"';
|
||||
}
|
||||
}
|
||||
buf[bufIndex] = 0;
|
||||
if (length == 0) {
|
||||
quotationStr[0] = '\"';
|
||||
}
|
||||
|
||||
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
shellFormatTimestamp(buf, *(int64_t*)val, precision);
|
||||
taosFprintfFile(pFile, "%s", buf);
|
||||
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue