Merge branch '3.0' into coverage/TD-28281-3.0

This commit is contained in:
Alex Duan 2024-01-13 12:26:52 +08:00
commit b18208ee4a
48 changed files with 583 additions and 261 deletions

View File

@ -53,9 +53,12 @@ typedef struct {
void* mgmt;
void* clientRpc;
void* serverRpc;
void* statusRpc;
void* syncRpc;
PutToQueueFp putToQueueFp;
GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp;
SendReqFp sendSyncReqFp;
SendRspFp sendRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
@ -67,6 +70,7 @@ void tmsgSetDefault(const SMsgCb* msgcb);
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);

View File

@ -75,6 +75,7 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval);
int64_t taosTimeGetIntervalEnd(int64_t ts, const SInterval* pInterval);
int32_t taosTimeCountIntervalForFill(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision, int32_t order);
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);

View File

@ -259,9 +259,13 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow*
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
#ifdef BUILD_NO_CALL
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
#endif
char* fmGetFuncName(int32_t funcId);
#ifdef __cplusplus

View File

@ -562,6 +562,7 @@ struct SStreamDispatchReq {
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
int32_t upstreamRelTaskId;
int32_t blockNum;
int64_t totalLen;
SArray* dataLen; // SArray<int32_t>

View File

@ -260,7 +260,7 @@ typedef enum ELogicConditionType {
#define TSDB_PASSWORD_LEN 32
#define TSDB_USET_PASSWORD_LEN 129
#define TSDB_VERSION_LEN 32
#define TSDB_LABEL_LEN 12
#define TSDB_LABEL_LEN 16
#define TSDB_JOB_STATUS_LEN 32
#define TSDB_CLUSTER_ID_LEN 40

View File

@ -1968,7 +1968,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj = NULL;
int64_t startTime = taosGetTimestampMs();
tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
timeout);
// in no topic status, delayed task also need to be processed
@ -2015,7 +2015,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout) {
tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}

View File

@ -109,7 +109,7 @@ bool tsEnableTelem = true;
int32_t tsTelemInterval = 43200;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
uint16_t tsTelemPort = 80;
char * tsTelemUri = "/report";
char *tsTelemUri = "/report";
#ifdef TD_ENTERPRISE
bool tsEnableCrashReport = false;
@ -253,7 +253,7 @@ int32_t tsCompactPullupInterval = 10;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 60;
float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 15;
int32_t tsStreamNodeCheckInterval = 16;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
@ -695,8 +695,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
@ -715,8 +714,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
return -1;
if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
@ -1382,7 +1381,7 @@ void taosCleanupCfg() {
typedef struct {
const char *optionName;
void * optionVar;
void *optionVar;
} OptionNameAndVar;
static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, SConfigItem *pItem, bool isDebugflag) {
@ -1395,7 +1394,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize,
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
int32_t flag = pItem->i32;
bool * pVar = pOptions[d].optionVar;
bool *pVar = pOptions[d].optionVar;
uInfo("%s set from %d to %d", optName, *pVar, flag);
*pVar = flag;
terrno = TSDB_CODE_SUCCESS;
@ -1470,8 +1469,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
};
static OptionNameAndVar options[] = {
{"audit", &tsEnableAudit},
static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},
{"asynclog", &tsAsyncLog},
{"disableStream", &tsDisableStream},
{"enableWhiteList", &tsEnableWhiteList},
@ -1502,8 +1500,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
{"s3PageCacheSize", &tsS3PageCacheSize},
{"s3UploadDelaySec", &tsS3UploadDelaySec},
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental}
};
{"experimental", &tsExperimental}};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false);
@ -1700,8 +1697,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
{"uDebugFlag", &uDebugFlag}, {"simDebugFlag", &simDebugFlag},
};
static OptionNameAndVar options[] = {
{"asyncLog", &tsAsyncLog},
static OptionNameAndVar options[] = {{"asyncLog", &tsAsyncLog},
{"assert", &tsAssert},
{"compressMsgSize", &tsCompressMsgSize},
{"countAlwaysReturnValue", &tsCountAlwaysReturnValue},
@ -1725,8 +1721,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
{"shellActivityTimer", &tsShellActivityTimer},
{"slowLogThreshold", &tsSlowLogThreshold},
{"useAdapter", &tsUseAdapter},
{"experimental", &tsExperimental}
};
{"experimental", &tsExperimental}};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -683,6 +683,10 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
return getDuration(*duration, *unit, duration, timePrecision);
}
static bool taosIsLeapYear(int32_t year) {
return (year % 4 == 0 && (year % 100 != 0 || year % 400 == 0));
}
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
if (duration == 0) {
return t;
@ -702,7 +706,13 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)numOfMonth;
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
int daysOfMonth[] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
if (taosIsLeapYear(1900 + tm.tm_year)) {
daysOfMonth[1] = 29;
}
if (tm.tm_mday > daysOfMonth[tm.tm_mon]) {
tm.tm_mday = daysOfMonth[tm.tm_mon];
}
return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction);
}
@ -872,23 +882,33 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
ASSERT(pInterval->offset >= 0);
if (pInterval->offset > 0) {
start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision);
// try to move current window to the left-hande-side, due to the offset effect.
int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
int64_t newe = end;
int64_t newe = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
int64_t slidingStart = start;
while (newe >= ts) {
end = newe;
newe = taosTimeAdd(newe, -pInterval->sliding, pInterval->slidingUnit, precision);
start = slidingStart;
slidingStart = taosTimeAdd(slidingStart, -pInterval->sliding, pInterval->slidingUnit, precision);
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, precision) - 1;
newe = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, precision);
}
start = taosTimeAdd(end, -pInterval->interval, pInterval->intervalUnit, precision) + 1;
start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision);
}
return start;
}
// used together with taosTimeTruncate. when offset is great than zero, slide-start/slide-end is the anchor point
int64_t taosTimeGetIntervalEnd(int64_t intervalStart, const SInterval* pInterval) {
if (pInterval->offset > 0) {
int64_t slideStart = taosTimeAdd(intervalStart, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision);
int64_t slideEnd = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
int64_t result = taosTimeAdd(slideEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
return result;
} else {
int64_t result = taosTimeAdd(intervalStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
return result;
}
}
// internal function, when program is paused in debugger,
// one can call this function from debugger to print a
// timestamp as human readable string, for example (gdb):

View File

@ -45,7 +45,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
void * pHead = rpcMallocCont(contLen);
tSerializeRetrieveIpWhite(pHead, contLen, &req);
SRpcMsg rpcMsg = {.pCont = pHead,
@ -144,7 +144,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
void * pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req);
tFreeSStatusReq(&req);
@ -161,7 +161,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SEpSet epSet = {0};
int8_t epUpdated = 0;
dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, 5000);
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (rpcRsp.code != 0) {
dmRotateMnodeEpSet(pMgmt->pData);
char tbuf[512];
@ -189,7 +189,7 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt) {
req.pVloads = vinfo.pVloads;
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
void * pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, &req);
tFreeSNotifyReq(&req);
@ -284,7 +284,7 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
SSDataBlock *dmBuildVariablesBlock(void) {
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
SSDataBlock * pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0;
const SSysTableMeta *pMeta = NULL;
getInfosDbMeta(&pMeta, &size);

View File

@ -47,6 +47,7 @@ static void *dmStatusThreadFp(void *param) {
}
SDmNotifyHandle dmNotifyHdl = {.state = 0};
static void *dmNotifyThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
setThreadName("dnode-notify");

View File

@ -48,6 +48,8 @@ typedef struct {
typedef struct {
void *serverRpc;
void *clientRpc;
void *statusRpc;
void *syncRpc;
SDnodeHandle msgHandles[TDMT_MAX];
} SDnodeTrans;
@ -136,8 +138,10 @@ int32_t dmInitServer(SDnode *pDnode);
void dmCleanupServer(SDnode *pDnode);
int32_t dmInitClient(SDnode *pDnode);
int32_t dmInitStatusClient(SDnode *pDnode);
int32_t dmInitSyncClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode);
void dmCleanupStatusClient(SDnode *pDnode);
void dmCleanupSyncClient(SDnode *pDnode);
SMsgCb dmGetMsgcb(SDnode *pDnode);
#ifdef TD_MODULE_OPTIMIZE
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers);

View File

@ -94,6 +94,9 @@ int32_t dmInitDnode(SDnode *pDnode) {
indexInit(tsNumOfCommitThreads);
streamMetaInit();
dmInitStatusClient(pDnode);
dmInitSyncClient(pDnode);
dmReportStartup("dnode-transport", "initialized");
dDebug("dnode is created, ptr:%p", pDnode);
code = 0;
@ -115,7 +118,9 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupClient(pDnode);
dmCleanupStatusClient(pDnode);
dmCleanupSyncClient(pDnode);
dmCleanupServer(pDnode);
dmClearVars(pDnode);
rpcCleanup();
streamMetaCleanup();

View File

@ -322,6 +322,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return 0;
}
}
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
if (pDnode->status == DND_STAT_INIT) {
terrno = TSDB_CODE_APP_IS_STARTING;
} else {
terrno = TSDB_CODE_APP_IS_STOPPING;
}
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
return -1;
} else {
rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
return 0;
}
}
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); }
@ -346,8 +363,8 @@ int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
rpcInit.label = "DND-C";
rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.label = "DNODE-CLI";
rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
@ -366,7 +383,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
@ -390,7 +407,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
rpcInit.label = "DND-STATUS";
rpcInit.label = "DNODE-STA-CLI";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024;
@ -421,16 +438,61 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
// pTrans->statusClientRpc = rpcOpen(&rpcInit);
// if (pTrans->statusClientRpc == NULL) {
// dError("failed to init dnode rpc status client");
// return -1;
// }
pTrans->statusRpc = rpcOpen(&rpcInit);
if (pTrans->statusRpc == NULL) {
dError("failed to init dnode rpc status client");
return -1;
}
dDebug("dnode rpc status client is initialized");
return 0;
}
int32_t dmInitSyncClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
rpcInit.label = "DNODE-SYNC-CLI";
rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 5000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->syncRpc = rpcOpen(&rpcInit);
if (pTrans->syncRpc == NULL) {
dError("failed to init dnode rpc sync client");
return -1;
}
dDebug("dnode rpc sync client is initialized");
return 0;
}
void dmCleanupClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->clientRpc) {
@ -441,11 +503,19 @@ void dmCleanupClient(SDnode *pDnode) {
}
void dmCleanupStatusClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
// if (pTrans->statusClientRpc) {
// rpcClose(pTrans->statusClientRpc);
// pTrans->statusClientRpc = NULL;
// dDebug("dnode rpc status client is closed");
// }
if (pTrans->statusRpc) {
rpcClose(pTrans->statusRpc);
pTrans->statusRpc = NULL;
dDebug("dnode rpc status client is closed");
}
}
void dmCleanupSyncClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->syncRpc) {
rpcClose(pTrans->syncRpc);
pTrans->syncRpc = NULL;
dDebug("dnode rpc sync client is closed");
}
}
int32_t dmInitServer(SDnode *pDnode) {
@ -486,7 +556,10 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
SMsgCb msgCb = {
.clientRpc = pDnode->trans.clientRpc,
.serverRpc = pDnode->trans.serverRpc,
.statusRpc = pDnode->trans.statusRpc,
.syncRpc = pDnode->trans.syncRpc,
.sendReqFp = dmSendReq,
.sendSyncReqFp = dmSendSyncReq,
.sendRspFp = dmSendRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,

View File

@ -32,6 +32,10 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
int32_t sendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
char *i642str(int64_t val) {
static char str[24] = {0};
@ -568,6 +572,7 @@ void mndDumpSdb() {
SMsgCb msgCb = {0};
msgCb.reportStartupFp = reportStartup;
msgCb.sendReqFp = sendReq;
msgCb.sendSyncReqFp = sendSyncReq;
msgCb.sendRspFp = sendRsp;
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefault(&msgCb);
@ -590,7 +595,7 @@ void mndDumpSdb() {
dumpTopic(pSdb, json);
dumpConsumer(pSdb, json);
dumpSubscribe(pSdb, json);
// dumpOffset(pSdb, json);
// dumpOffset(pSdb, json);
dumpStream(pSdb, json);
dumpAcct(pSdb, json);
dumpAuth(pSdb, json);
@ -605,7 +610,7 @@ void mndDumpSdb() {
char *pCont = tjsonToString(json);
int32_t contLen = strlen(pCont);
char file[] = "sdb.json";
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC| TD_FILE_WRITE_THROUGH);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to write %s since %s", file, terrstr());

View File

@ -145,7 +145,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -299,14 +299,14 @@ static bool mnodeIsNotLeader(SMnode *pMnode) {
}
static int32_t minCronTime() {
int64_t min = INT64_MAX;
int32_t min = INT32_MAX;
min = TMIN(min, tsTtlPushIntervalSec);
min = TMIN(min, tsTrimVDbIntervalSec);
min = TMIN(min, tsTransPullupInterval);
min = TMIN(min, tsCompactPullupInterval);
min = TMIN(min, tsMqRebalanceInterval);
min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 5); // checkpointRemain
min = TMIN(min, 6); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval);
int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
@ -386,7 +386,8 @@ static void *mndThreadFp(void *param) {
int64_t minCron = minCronTime();
if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) {
// not leader, do nothing
mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)) terrno = 0;
mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno));
terrno = 0;
continue;
}
mndDoTimerPullupTask(pMnode, sec);

View File

@ -92,6 +92,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
} else {
mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName);
@ -100,6 +101,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
}
} else {

View File

@ -747,7 +747,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
hbStatus);

View File

@ -61,6 +61,7 @@ class MndTestTrans2 : public ::testing::Test {
static SMsgCb msgCb = {0};
msgCb.reportStartupFp = reportStartup;
msgCb.sendReqFp = sendReq;
msgCb.sendSyncReqFp = sendSyncReq;
msgCb.sendRspFp = sendRsp;
msgCb.queueFps[SYNC_QUEUE] = putToQueue;
msgCb.queueFps[WRITE_QUEUE] = putToQueue;

View File

@ -963,7 +963,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) {
int64_t ts = taosGetTimestampMs();
pTask->execInfo.step1Start = ts;
tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
} else {

View File

@ -820,15 +820,23 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
return 0;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask != NULL) {
ASSERT(streamTaskReadyToRun(pTask, NULL));
char* pStatus = NULL;
if (streamTaskReadyToRun(pTask, &pStatus)) {
int64_t execTs = pTask->status.lastExecTs;
int32_t idle = taosGetTimestampMs() - execTs;
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
streamResumeTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, pStatus, status);
}
streamMetaReleaseTask(pMeta, pTask);
}
return 0;
}

View File

@ -1990,9 +1990,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (SFSNEXTROW_FILESET == state->state) {
_next_fileset:
if (--state->iFileSet < 0) {
clearLastFileSet(state);
if (--state->iFileSet < 0) {
*ppRow = NULL;
return code;
} else {
@ -2862,7 +2862,9 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
taosArraySet(pColArray, iCol, &lastCol);
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
if (aColIndex >= 0) {
taosArrayRemove(aColArray, aColIndex);
}
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;

View File

@ -73,6 +73,7 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static void resetTableListIndex(SReaderStatus* pStatus);
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey);
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -3040,6 +3041,17 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
return TSDB_READ_RETURN;
}
if (pReader->status.bProcMemPreFileset) {
code = buildFromPreFilesetBuffer(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pResBlock->info.rows > 0) {
pReader->status.processingMemPreFileSet = true;
return TSDB_READ_RETURN;
}
}
if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
return TSDB_READ_CONTINUE;
} else { // all blocks in data file are checked, let's check the data in last files
@ -4297,6 +4309,7 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
} else {
tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr);
pStatus->bProcMemPreFileset = false;
pStatus->processingMemPreFileSet = false;
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
@ -4329,7 +4342,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
pStatus->bProcMemFirstFileset, pReader->idStr);
if (pStatus->bProcMemPreFileset) {
if (pBlock->info.rows > 0) {
if (pReader->notifyFn) {
if (pReader->notifyFn && !pReader->status.processingMemPreFileSet) {
int32_t fid = pReader->status.pCurrentFileset->fid;
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;

View File

@ -238,6 +238,7 @@ typedef struct SReaderStatus {
int64_t prevFilesetStartKey;
int64_t prevFilesetEndKey;
bool bProcMemFirstFileset;
bool processingMemPreFileSet;
STableUidList procMemUidList;
STableBlockScanInfo** pProcMemTableIter;
} SReaderStatus;

View File

@ -14,11 +14,11 @@
*/
#define _DEFAULT_SOURCE
#include "tq.h"
#include "sync.h"
#include "tq.h"
#include "tqCommon.h"
#include "tsdb.h"
#include "vnd.h"
#include "tqCommon.h"
#define BATCH_ENABLE 0
@ -411,7 +411,7 @@ static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
}
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = tmsgSendReq(pEpSet, pMsg);
int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
@ -555,7 +555,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true;
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
streamMetaWLock(pMeta);
if (pMeta->startInfo.tasksWillRestart) {

View File

@ -1838,7 +1838,7 @@ static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
STimeWindow w = {0};
w.skey = taosTimeTruncate(ts, pInterval);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
return w;
}
@ -1887,31 +1887,17 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
}
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
int64_t slidingStart = 0;
if (pInterval->offset > 0) {
slidingStart = taosTimeAdd(tw->skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision);
} else {
slidingStart = tw->skey;
}
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
if (!IS_CALENDAR_TIME_DURATION(pInterval->slidingUnit)) {
tw->skey += pInterval->sliding * factor;
tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
return;
}
// convert key to second
int64_t key = convertTimePrecision(tw->skey, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;
int64_t duration = pInterval->sliding;
if (pInterval->slidingUnit == 'y') {
duration *= 12;
}
struct tm tm;
time_t t = (time_t)key;
taosLocalTime(&t, &tm, NULL);
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + duration * factor);
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
}
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {

View File

@ -922,8 +922,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
", window:%" PRId64 " - %" PRId64,
qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
@ -1129,7 +1129,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
pWindow->skey = INT64_MIN;

View File

@ -449,7 +449,7 @@ STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
*/
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
win.ekey = taosTimeGetIntervalEnd(win.skey, pInterval);
if (win.ekey < win.skey) {
win.ekey = INT64_MAX;
}

View File

@ -427,6 +427,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
#ifdef BUILD_NO_CALL
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
for (int32_t i = 0; i < numOfCols; i++) {
if (fmIsUserDefinedFunc(pFCtx[i].functionId) || !fmIsInvertible(pFCtx[i].functionId)) {
@ -435,6 +436,7 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
}
return true;
}
#endif
void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo* pInfo) {
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
@ -3846,6 +3848,7 @@ _error:
return NULL;
}
#ifdef BUILD_NO_CALL
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
for (int i = 0; i < num; i++) {
if (type == STREAM_INVERT) {
@ -3855,6 +3858,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
}
}
#endif
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
@ -3947,9 +3951,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
#ifdef BUILD_NO_CALL
if (pInfo->invertible) {
setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
}
#endif
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);

View File

@ -450,7 +450,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl
TSKEY next = primaryKeys[startPos];
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, pInterval);
pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
} else {
pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
pNext->skey = pNext->ekey - pInterval->interval + 1;
@ -459,7 +459,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl
TSKEY next = primaryKeys[startPos];
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, pInterval);
pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
} else {
pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
pNext->ekey = pNext->skey + pInterval->interval - 1;
@ -1079,16 +1079,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pBlock;
}
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
for (int i = 0; i < num; i++) {
if (type == STREAM_INVERT) {
fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
} else if (type == STREAM_NORMAL) {
fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
}
}
}
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
if (NULL == pResult) {

View File

@ -158,4 +158,32 @@ TEST(testCase, timewindow_gen) {
}
TEST(testCase, timewindow_natural) {
osSetTimezone("CST");
int32_t precision = TSDB_TIME_PRECISION_MILLI;
SInterval interval2 = createInterval(17, 17, 13392000000, 'n', 'n', 0, precision);
int64_t key = 1648970865984;
STimeWindow w0 = getAlignQueryTimeWindow(&interval2, key);
printTimeWindow(&w0, precision, key);
ASSERT_GE(w0.ekey, key);
int64_t key1 = 1633446027072;
STimeWindow w1 = {0};
getInitialStartTimeWindow(&interval2, key1, &w1, true);
printTimeWindow(&w1, precision, key1);
STimeWindow w3 = getAlignQueryTimeWindow(&interval2, key1);
printf("%ld win %ld, %ld\n", key1, w3.skey, w3.ekey);
int64_t key2 = 1648758398208;
STimeWindow w2 = {0};
getInitialStartTimeWindow(&interval2, key2, &w2, true);
printTimeWindow(&w2, precision, key2);
STimeWindow w4 = getAlignQueryTimeWindow(&interval2, key2);
printf("%ld win %ld, %ld\n", key2, w3.skey, w3.ekey);
ASSERT_EQ(w3.skey, w4.skey);
ASSERT_EQ(w3.ekey, w4.ekey);
}
#pragma GCC diagnostic pop

View File

@ -40,7 +40,9 @@ typedef struct SBuiltinFuncDefinition {
FExecProcess processFunc;
FScalarExecProcess sprocessFunc;
FExecFinalize finalizeFunc;
#ifdef BUILD_NO_CALL
FExecProcess invertFunc;
#endif
FExecCombine combineFunc;
const char* pPartialFunc;
const char* pMergeFunc;

View File

@ -59,12 +59,19 @@ int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t countFunction(SqlFunctionCtx* pCtx);
#ifdef BUILD_NO_CALL
int32_t countInvertFunction(SqlFunctionCtx* pCtx);
#endif
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t sumFunction(SqlFunctionCtx* pCtx);
#ifdef BUILD_NO_CALL
int32_t sumInvertFunction(SqlFunctionCtx* pCtx);
#endif
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
@ -81,7 +88,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx);
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx);
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
#ifdef BUILD_NO_CALL
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
#endif
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getAvgInfoSize();
@ -91,7 +102,11 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx);
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
#ifdef BUILD_NO_CALL
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
#endif
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getStddevInfoSize();
@ -99,7 +114,6 @@ bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t leastSQRFunction(SqlFunctionCtx* pCtx);
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx);
int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);

View File

@ -2368,7 +2368,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = countFunction,
.sprocessFunc = countScalarFunction,
.finalizeFunc = functionFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = countInvertFunction,
#endif
.combineFunc = combineFunction,
.pPartialFunc = "count",
.pMergeFunc = "sum"
@ -2384,7 +2386,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = sumFunction,
.sprocessFunc = sumScalarFunction,
.finalizeFunc = functionFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = sumInvertFunction,
#endif
.combineFunc = sumCombine,
.pPartialFunc = "sum",
.pMergeFunc = "sum"
@ -2429,7 +2433,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = stddevFunction,
.sprocessFunc = stddevScalarFunction,
.finalizeFunc = stddevFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = stddevInvertFunction,
#endif
.combineFunc = stddevCombine,
.pPartialFunc = "_stddev_partial",
.pMergeFunc = "_stddev_merge"
@ -2443,7 +2449,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = stddevFunctionSetup,
.processFunc = stddevFunction,
.finalizeFunc = stddevPartialFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = stddevInvertFunction,
#endif
.combineFunc = stddevCombine,
},
{
@ -2455,7 +2463,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = stddevFunctionSetup,
.processFunc = stddevFunctionMerge,
.finalizeFunc = stddevFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = stddevInvertFunction,
#endif
.combineFunc = stddevCombine,
},
{
@ -2468,7 +2478,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = leastSQRFunction,
.sprocessFunc = leastSQRScalarFunction,
.finalizeFunc = leastSQRFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = leastSQRCombine,
},
{
@ -2482,7 +2494,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = avgFunction,
.sprocessFunc = avgScalarFunction,
.finalizeFunc = avgFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = avgInvertFunction,
#endif
.combineFunc = avgCombine,
.pPartialFunc = "_avg_partial",
.pMergeFunc = "_avg_merge"
@ -2497,7 +2511,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = avgFunctionSetup,
.processFunc = avgFunction,
.finalizeFunc = avgPartialFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = avgInvertFunction,
#endif
.combineFunc = avgCombine,
},
{
@ -2509,7 +2525,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = avgFunctionSetup,
.processFunc = avgFunctionMerge,
.finalizeFunc = avgFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = avgInvertFunction,
#endif
.combineFunc = avgCombine,
},
{
@ -2523,7 +2541,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = percentileFunction,
.sprocessFunc = percentileScalarFunction,
.finalizeFunc = percentileFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = NULL,
},
{
@ -2536,7 +2556,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = apercentileFunction,
.sprocessFunc = apercentileScalarFunction,
.finalizeFunc = apercentileFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = apercentileCombine,
.pPartialFunc = "_apercentile_partial",
.pMergeFunc = "_apercentile_merge",
@ -2551,7 +2573,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction,
.finalizeFunc = apercentilePartialFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = apercentileCombine,
},
{
@ -2563,7 +2587,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunctionMerge,
.finalizeFunc = apercentileFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = apercentileCombine,
},
{
@ -2609,7 +2635,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = spreadFunction,
.sprocessFunc = spreadScalarFunction,
.finalizeFunc = spreadFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = spreadCombine,
.pPartialFunc = "_spread_partial",
.pMergeFunc = "_spread_merge"
@ -2624,7 +2652,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunction,
.finalizeFunc = spreadPartialFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = spreadCombine,
},
{
@ -2637,7 +2667,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunctionMerge,
.finalizeFunc = spreadFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = spreadCombine,
},
{
@ -2651,7 +2683,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunction,
.finalizeFunc = elapsedFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = elapsedCombine,
},
{
@ -2664,7 +2698,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunction,
.finalizeFunc = elapsedPartialFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = elapsedCombine,
},
{
@ -2677,7 +2713,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunctionMerge,
.finalizeFunc = elapsedFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = elapsedCombine,
},
{
@ -2907,7 +2945,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = histogramFunction,
.sprocessFunc = histogramScalarFunction,
.finalizeFunc = histogramFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = histogramCombine,
.pPartialFunc = "_histogram_partial",
.pMergeFunc = "_histogram_merge",
@ -2921,7 +2961,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = histogramFunctionSetup,
.processFunc = histogramFunctionPartial,
.finalizeFunc = histogramPartialFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = histogramCombine,
},
{
@ -2933,7 +2975,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = histogramFunctionMerge,
.finalizeFunc = histogramFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = histogramCombine,
},
{
@ -2946,7 +2990,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = hllFunction,
.sprocessFunc = hllScalarFunction,
.finalizeFunc = hllFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = hllCombine,
.pPartialFunc = "_hyperloglog_partial",
.pMergeFunc = "_hyperloglog_merge"
@ -2960,7 +3006,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = hllFunction,
.finalizeFunc = hllPartialFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = hllCombine,
},
{
@ -2972,7 +3020,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = hllFunctionMerge,
.finalizeFunc = hllFinalize,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif
.combineFunc = hllCombine,
},
{

View File

@ -549,6 +549,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
#ifdef BUILD_NO_CALL
int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
int64_t numOfElem = getNumOfElems(pCtx);
@ -559,6 +560,7 @@ int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
SET_VAL(pResInfo, *((int64_t*)buf), 1);
return TSDB_CODE_SUCCESS;
}
#endif
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
@ -642,6 +644,7 @@ _sum_over:
return TSDB_CODE_SUCCESS;
}
#ifdef BUILD_NO_CALL
int32_t sumInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
@ -699,6 +702,7 @@ int32_t sumInvertFunction(SqlFunctionCtx* pCtx) {
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
#endif
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
@ -1230,6 +1234,7 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
#ifdef BUILD_NO_CALL
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
@ -1294,6 +1299,7 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) {
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
#endif
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
@ -1578,11 +1584,6 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) {
// TODO
return TSDB_CODE_SUCCESS;
}
int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SLeastSQRInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);

View File

@ -724,6 +724,7 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
#ifdef BUILD_NO_CALL
int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
@ -786,6 +787,7 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
#endif
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);

View File

@ -281,6 +281,7 @@ void fmFuncMgtDestroy() {
}
}
#ifdef BUILD_NO_CALL
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED;
@ -314,6 +315,7 @@ bool fmIsInvertible(int32_t funcId) {
}
return res;
}
#endif
// function has same input/output type
bool fmIsSameInOutType(int32_t funcId) {

View File

@ -215,6 +215,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int32_t status = 0;
SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
@ -223,7 +224,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
if (pTask->pMeta->role == NODE_ROLE_FOLLOWER) {
if (pMeta->role == NODE_ROLE_FOLLOWER) {
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
status = TASK_INPUT_STATUS__REFUSED;
} else {
@ -244,6 +245,22 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
// disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state
STaskId* pRelTaskId = &pTask->streamTaskId;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId);
if (pStreamTask != NULL) {
atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
streamMetaReleaseTask(pMeta, pStreamTask);
}
stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64
" close inputQ for upstream:0x%x",
id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId);
}
status = streamTaskAppendInputBlocks(pTask, pReq);
@ -252,9 +269,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
// disable the data from upstream tasks
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
status = TASK_INPUT_STATUS__BLOCKED;
}
// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
// status = TASK_INPUT_STATUS__BLOCKED;
// }
{
// do send response with the input status
@ -295,6 +312,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
}
pTask->upstreamInfo.numOfClosed = 0;
stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr);
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {

View File

@ -58,6 +58,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
@ -84,6 +85,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
@ -114,6 +116,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId;
pReq->upstreamRelTaskId = pTask->streamTaskId.taskId;
pReq->blockNum = numOfBlocks;
pReq->taskId = dstTaskId;
pReq->type = type;

View File

@ -371,7 +371,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
// char* p = NULL;
SStreamTaskState* pState = streamTaskGetStatus(pStreamTask);
status = pState->state;
char* p = pState->name;
@ -392,8 +391,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
}
// 1. expand the query time window for stream task of WAL scanner
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
} else {
stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr);
}
// 2. transfer the ownership of executor state
streamTaskReleaseState(pTask);
@ -407,10 +410,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 4. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. save to disk
// 5. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
// 6. add empty delete block
// 6. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask);
// 7. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
@ -430,6 +436,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamMeta* pMeta = pTask->pMeta;
ASSERT(pTask->status.appendTranstateBlock == 1);
int32_t level = pTask->info.taskLevel;
@ -439,8 +447,14 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id);
} else { // drop fill-history task and open inputQ of sink task
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask != NULL) {
streamTaskOpenAllUpstreamInput(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
}
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
}
return code;
@ -496,16 +510,17 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
}
}
int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
streamFreeQitem((SStreamQueueItem*)pBlock);
stDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
return 0;
}
}
@ -536,7 +551,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
}
} else { // non-dispatch task, do task state transfer directly
streamFreeQitem((SStreamQueueItem*)pBlock);
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, pTask->info.taskLevel);
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
@ -604,7 +619,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
}
if (type == STREAM_INPUT__TRANS_STATE) {
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput);
continue;
}

View File

@ -1520,8 +1520,6 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId);
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}

View File

@ -147,8 +147,6 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
const char* id = pTask->id.idStr;
int32_t taskLevel = pTask->info.taskLevel;

View File

@ -1029,6 +1029,11 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
} else {
ASSERT(pTask->info.fillHistory == 0);
if (pTask->info.taskLevel >= TASK_LEVEL__AGG) {
return;
}
int64_t ekey = 0;
if (pRange->window.ekey < INT64_MAX) {
ekey = pRange->window.ekey + 1;
@ -1043,10 +1048,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
stDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
stDebug("s-task:%s level:%d related fill-history task exists, set stream task timeWindow:%" PRId64 " - %" PRId64
", verRang:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, ver, INT64_MAX);
SVersionRange verRange = {.minVer = ver, .maxVer = INT64_MAX};
STimeWindow win = pRange->window;
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}
}

View File

@ -48,6 +48,14 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
}
return code;
}
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg) {
int32_t code = (*defaultMsgCb.sendSyncReqFp)(epSet, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
void tmsgSendRsp(SRpcMsg* pMsg) {
#if 1

View File

@ -1907,7 +1907,12 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
static void* cliWorkThread(void* arg) {
SCliThrd* pThrd = (SCliThrd*)arg;
pThrd->pid = taosGetSelfPthreadId();
setThreadName("trans-cli-work");
char threadName[TSDB_LABEL_LEN] = {0};
STrans* pInst = pThrd->pTransInst;
strtolower(threadName, pInst->label);
setThreadName(threadName);
uv_run(pThrd->loop, UV_RUN_DEFAULT);
tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
@ -2701,6 +2706,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
ret = TSDB_CODE_TIMEOUT_ERROR;
} else {
memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg));
pSyncMsg->pRsp->pCont = NULL;
if (pSyncMsg->hasEpSet == 1) {
epsetAssign(pEpSet, &pSyncMsg->epSet);
*epUpdated = 1;

View File

@ -667,7 +667,7 @@ void transDestroySyncMsg(void* msg) {
STransSyncMsg* pSyncMsg = msg;
tsem_destroy(pSyncMsg->pSem);
taosMemoryFree(pSyncMsg->pSem);
transFreeMsg(pSyncMsg->pRsp->pCont);
taosMemoryFree(pSyncMsg->pRsp);
taosMemoryFree(pSyncMsg);
}

View File

@ -36,6 +36,7 @@
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2

View File

@ -13,25 +13,13 @@ class TDTestCase:
tdSql.init(conn.cursor(), True)
def run(self):
"""
timestamp输入插入规则
对于插入的字段类型为timestamp类型的字段只允许这么几种情况
timestamp
timestamp +/- interval
interval + timestamp
timestamp可以是字符串譬如"2023-12-05 00:00:00.000", 也可以是int型 譬如1701619200000
interval支持b, u, a, s, m, h, d, w 不支持n, y,譬如1h, 2d
仅支持2元表达式譬如timestamp + 2h 不支持2元以上表达譬如timestamp + 2h + 1d
"""
tdSql.execute("create database test_insert_timestamp PRECISION 'ns';")
tdSql.execute("use test_insert_timestamp;")
tdSql.execute("create stable st(ts timestamp, c1 int) tags(id int);")
tdSql.execute("create table test_t using st tags(1);")
expectErrInfo = "syntax error"
# 异常场景:timestamp + timestamp
# abnormal scenario: timestamp + timestamp
tdSql.error("insert into test_t values(now + today(), 1 );", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(now - today(), 1 );", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(today() + now(), 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False)
@ -40,24 +28,24 @@ class TDTestCase:
tdSql.error("insert into test_t values('2023-11-28 00:00:00.000' + 1701111600000, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(1701111500000 + 1701111600000, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False)
# 异常场景:timestamp + interval + interval
# abnormal scenario: timestamp + interval + interval
tdSql.error("insert into test_t values(today() + 1d + 1s, 1);", expectErrInfo=expectErrInfo, fullMatched=False)
# 异常场景:interval - timestamp
# abnormal scenario: interval - timestamp
tdSql.error("insert into test_t values(2h - now(), 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(2h - today(), 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False)
# 异常场景:interval + interval
# abnormal scenario: interval + interval
tdSql.error("insert into test_t values(2h - 1h, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(2h + 1h, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False)
# 异常场景非法interval类型n
# abnormal scenario: non-support datatype - n
tdSql.error("insert into test_t values(today() + 2n, 7); ", expectErrInfo=expectErrInfo, fullMatched=False)
# 异常场景非法interval类型y
# abnormal scenario: non-support datatype - y
tdSql.error("insert into test_t values(today() - 2y, 8);", expectErrInfo=expectErrInfo, fullMatched=False)
# 异常场景:数据类型不对
# abnormal scenario: non-support datatype
tdSql.error("insert into test_t values('a1701619200000', 8);", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values('ss2023-12-05 00:00:00.000' + '1701619200000', 1);", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(123456, 1);", expectErrInfo="Timestamp data out of range")
@ -66,31 +54,31 @@ class TDTestCase:
tdSql.error("insert into test_t values(None, 1);", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(null, 1);", expectErrInfo=expectErrInfo, fullMatched=False)
# 异常场景:格式不对
# abnormal scenario: incorrect format
tdSql.error("insert into test_t values('2023-122-05 00:00:00.000' + '1701619200000', 1);", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values('2023-12--05 00:00:00.000' + '1701619200000', 1);", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values('12/12/2023' + 10a, 1);", expectErrInfo=expectErrInfo, fullMatched=False)
tdSql.error("insert into test_t values(1701619200000111, 1);", expectErrInfo="Timestamp data out of range", fullMatched=False)
# 正常场景:timestamp + interval
# normal scenario:timestamp + interval
tdSql.execute("insert into test_t values(today() + 2b, 1);")
tdSql.execute("insert into test_t values(1701619200000000000 + 2u, 2);")
tdSql.execute("insert into test_t values(today + 2a, 3);")
tdSql.execute("insert into test_t values('2023-12-05 23:59:59.999' + 2a, 4);")
tdSql.execute("insert into test_t values(1701921599000000000 + 3a, 5);")
# 正常场景:timestamp - interval
# normal scenario:timestamp - interval
tdSql.execute("insert into test_t values(today() - 2s, 6);")
tdSql.execute("insert into test_t values(now() - 2m, 7);")
tdSql.execute("insert into test_t values(today - 2h, 8);")
tdSql.execute("insert into test_t values('2023-12-05 00:00:00.000000000' - 2a, 9);")
tdSql.execute("insert into test_t values(1701669000000000000 - 2a, 10);")
# 正常场景:interval + timestamp
# normal scenario:interval + timestamp
tdSql.execute("insert into test_t values(2d + now, 11);")
tdSql.execute("insert into test_t values(2w + today, 12);")
# 正常场景:timestamp
# normal scenario:timestamp
tdSql.execute("insert into test_t values('2023-12-05 00:00:00.000', 13);")
tdSql.execute("insert into test_t values(1701629100000000000, 14);")
tdSql.execute("insert into test_t values(now() + 2s, 15);")
@ -102,7 +90,7 @@ class TDTestCase:
tdSql.execute("insert into test_t values(1701619200000000000, -5);")
tdSql.execute("insert into test_t values('2023-12-05 12:12:12' + 10a, 19);")
# 验证数据
# data verification
tdSql.query(f'select ts,c1 from test_t order by c1;')
tdSql.checkRows(22)
tdSql.checkEqual(tdSql.queryResult[0][0], 1699977600000000000) # c1=-15
@ -133,12 +121,10 @@ class TDTestCase:
tdSql.execute("drop database if exists test_insert_timestamp;")
def __convert_ts_to_date(self, ts: int) -> str:
# 创建datetime对象并进行转换
dt_object = datetime.datetime.fromtimestamp(ts / 1e9)
# 格式化日期字符串
formatted_date = dt_object.strftime('%Y-%m-%d')
# print("转换后的日期为:", formatted_date)
return formatted_date
def __get_today_ts(self) -> int:

View File

@ -0,0 +1,60 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.dnodes import tdDnodes
import random
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def restartTaosd(self, index=1, dbname="db"):
tdDnodes.stop(index)
tdDnodes.startWithoutSleep(index)
tdSql.execute(f"use d")
def run(self):
tdSql.execute("drop database if exists d");
tdSql.execute("create database d");
tdSql.execute("use d");
tdSql.execute("create table st(ts timestamp, f int) tags (t int)")
for i in range(-2048, 2047):
ts = 1626624000000 + i;
tdSql.execute(f"insert into ct1 using st tags(1) values({ts}, {i})")
tdSql.execute("flush database d")
for i in range(1638):
ts = 1648758398208 + i
tdSql.execute(f"insert into ct1 using st tags(1) values({ts}, {i})")
tdSql.execute("insert into ct1 using st tags(1) values(1648970742528, 1638)")
tdSql.execute("flush database d")
tdSql.query("select count(ts) from ct1 interval(17n, 5n)")
self.restartTaosd()
tdSql.query("select count(ts) from ct1 interval(17n, 5n)")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())