Merge pull request #28111 from taosdata/enh/TD-31890-19
ehn: remove void
This commit is contained in:
commit
8c1473d2d8
|
@ -72,8 +72,8 @@ extern "C" {
|
||||||
#ifdef TD_TSZ
|
#ifdef TD_TSZ
|
||||||
extern bool lossyFloat;
|
extern bool lossyFloat;
|
||||||
extern bool lossyDouble;
|
extern bool lossyDouble;
|
||||||
int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
|
void tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals,
|
||||||
uint32_t intervals, int32_t ifAdtFse, const char *compressor);
|
int32_t ifAdtFse, const char *compressor);
|
||||||
|
|
||||||
void tsCompressExit();
|
void tsCompressExit();
|
||||||
|
|
||||||
|
|
|
@ -81,11 +81,21 @@ static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert
|
||||||
static void dmStopDnode(int signum, void *sigInfo, void *context) {
|
static void dmStopDnode(int signum, void *sigInfo, void *context) {
|
||||||
// taosIgnSignal(SIGUSR1);
|
// taosIgnSignal(SIGUSR1);
|
||||||
// taosIgnSignal(SIGUSR2);
|
// taosIgnSignal(SIGUSR2);
|
||||||
(void)taosIgnSignal(SIGTERM);
|
if (taosIgnSignal(SIGTERM) != 0) {
|
||||||
(void)taosIgnSignal(SIGHUP);
|
dWarn("failed to ignore signal SIGTERM");
|
||||||
(void)taosIgnSignal(SIGINT);
|
}
|
||||||
(void)taosIgnSignal(SIGABRT);
|
if (taosIgnSignal(SIGHUP) != 0) {
|
||||||
(void)taosIgnSignal(SIGBREAK);
|
dWarn("failed to ignore signal SIGHUP");
|
||||||
|
}
|
||||||
|
if (taosIgnSignal(SIGINT) != 0) {
|
||||||
|
dWarn("failed to ignore signal SIGINT");
|
||||||
|
}
|
||||||
|
if (taosIgnSignal(SIGABRT) != 0) {
|
||||||
|
dWarn("failed to ignore signal SIGABRT");
|
||||||
|
}
|
||||||
|
if (taosIgnSignal(SIGBREAK) != 0) {
|
||||||
|
dWarn("failed to ignore signal SIGBREAK");
|
||||||
|
}
|
||||||
|
|
||||||
dInfo("shut down signal is %d", signum);
|
dInfo("shut down signal is %d", signum);
|
||||||
#ifndef WINDOWS
|
#ifndef WINDOWS
|
||||||
|
@ -103,11 +113,19 @@ void dmLogCrash(int signum, void *sigInfo, void *context) {
|
||||||
// taosIgnSignal(SIGBREAK);
|
// taosIgnSignal(SIGBREAK);
|
||||||
|
|
||||||
#ifndef WINDOWS
|
#ifndef WINDOWS
|
||||||
(void)taosIgnSignal(SIGBUS);
|
if (taosIgnSignal(SIGBUS) != 0) {
|
||||||
|
dWarn("failed to ignore signal SIGBUS");
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
(void)taosIgnSignal(SIGABRT);
|
if (taosIgnSignal(SIGABRT) != 0) {
|
||||||
(void)taosIgnSignal(SIGFPE);
|
dWarn("failed to ignore signal SIGABRT");
|
||||||
(void)taosIgnSignal(SIGSEGV);
|
}
|
||||||
|
if (taosIgnSignal(SIGFPE) != 0) {
|
||||||
|
dWarn("failed to ignore signal SIGABRT");
|
||||||
|
}
|
||||||
|
if (taosIgnSignal(SIGSEGV) != 0) {
|
||||||
|
dWarn("failed to ignore signal SIGABRT");
|
||||||
|
}
|
||||||
|
|
||||||
char *pMsg = NULL;
|
char *pMsg = NULL;
|
||||||
const char *flags = "UTL FATAL ";
|
const char *flags = "UTL FATAL ";
|
||||||
|
@ -136,24 +154,31 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmSetSignalHandle() {
|
static void dmSetSignalHandle() {
|
||||||
(void)taosSetSignal(SIGUSR1, dmSetDebugFlag);
|
if (taosSetSignal(SIGUSR1, dmSetDebugFlag) != 0) {
|
||||||
(void)taosSetSignal(SIGUSR2, dmSetAssert);
|
dWarn("failed to set signal SIGUSR1");
|
||||||
(void)taosSetSignal(SIGTERM, dmStopDnode);
|
}
|
||||||
(void)taosSetSignal(SIGHUP, dmStopDnode);
|
if (taosSetSignal(SIGUSR2, dmSetAssert) != 0) {
|
||||||
(void)taosSetSignal(SIGINT, dmStopDnode);
|
dWarn("failed to set signal SIGUSR1");
|
||||||
(void)taosSetSignal(SIGBREAK, dmStopDnode);
|
}
|
||||||
|
if (taosSetSignal(SIGTERM, dmStopDnode) != 0) {
|
||||||
|
dWarn("failed to set signal SIGUSR1");
|
||||||
|
}
|
||||||
|
if (taosSetSignal(SIGHUP, dmStopDnode) != 0) {
|
||||||
|
dWarn("failed to set signal SIGUSR1");
|
||||||
|
}
|
||||||
|
if (taosSetSignal(SIGINT, dmStopDnode) != 0) {
|
||||||
|
dWarn("failed to set signal SIGUSR1");
|
||||||
|
}
|
||||||
|
if (taosSetSignal(SIGBREAK, dmStopDnode) != 0) {
|
||||||
|
dWarn("failed to set signal SIGUSR1");
|
||||||
|
}
|
||||||
#ifndef WINDOWS
|
#ifndef WINDOWS
|
||||||
(void)taosSetSignal(SIGTSTP, dmStopDnode);
|
if (taosSetSignal(SIGTSTP, dmStopDnode) != 0) {
|
||||||
(void)taosSetSignal(SIGQUIT, dmStopDnode);
|
dWarn("failed to set signal SIGUSR1");
|
||||||
#endif
|
}
|
||||||
|
if (taosSetSignal(SIGQUIT, dmStopDnode) != 0) {
|
||||||
#if 0
|
dWarn("failed to set signal SIGUSR1");
|
||||||
#ifndef WINDOWS
|
}
|
||||||
(void)taosSetSignal(SIGBUS, dmLogCrash);
|
|
||||||
#endif
|
|
||||||
(void)taosSetSignal(SIGABRT, dmLogCrash);
|
|
||||||
(void)taosSetSignal(SIGFPE, dmLogCrash);
|
|
||||||
(void)taosSetSignal(SIGSEGV, dmLogCrash);
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,9 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
|
||||||
if (pMgmt->pData->ipWhiteVer == ver) {
|
if (pMgmt->pData->ipWhiteVer == ver) {
|
||||||
if (ver == 0) {
|
if (ver == 0) {
|
||||||
dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
|
dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
|
||||||
(void)rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL);
|
if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
|
||||||
|
dError("failed to disable ip white list on dnode");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -91,7 +93,9 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
|
||||||
dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
|
dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
|
||||||
pMgmt->statusSeq);
|
pMgmt->statusSeq);
|
||||||
pMgmt->pData->dropped = 1;
|
pMgmt->pData->dropped = 1;
|
||||||
(void)dmWriteEps(pMgmt->pData);
|
if (dmWriteEps(pMgmt->pData) != 0) {
|
||||||
|
dError("failed to write dnode file");
|
||||||
|
}
|
||||||
dInfo("dnode will exit since it is in the dropped state");
|
dInfo("dnode will exit since it is in the dropped state");
|
||||||
(void)raise(SIGINT);
|
(void)raise(SIGINT);
|
||||||
}
|
}
|
||||||
|
@ -147,7 +151,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
req.clusterCfg.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
|
req.clusterCfg.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
|
||||||
tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
|
tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
|
||||||
char timestr[32] = "1970-01-01 00:00:00.00";
|
char timestr[32] = "1970-01-01 00:00:00.00";
|
||||||
(void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
|
if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0) != 0) {
|
||||||
|
dError("failed to parse time since %s", tstrerror(code));
|
||||||
|
}
|
||||||
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
|
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
|
||||||
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
|
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
|
||||||
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
|
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
|
||||||
|
@ -243,7 +249,9 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
|
||||||
|
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
||||||
(void)rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL);
|
if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
|
||||||
|
dError("failed to send notify req");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
|
|
@ -305,11 +305,16 @@ int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
|
||||||
|
|
||||||
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
|
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
|
||||||
if (taosCheckPthreadValid(pMgmt->notifyThread)) {
|
if (taosCheckPthreadValid(pMgmt->notifyThread)) {
|
||||||
(void)tsem_post(&dmNotifyHdl.sem);
|
if (tsem_post(&dmNotifyHdl.sem) != 0) {
|
||||||
|
dError("failed to post notify sem");
|
||||||
|
}
|
||||||
|
|
||||||
(void)taosThreadJoin(pMgmt->notifyThread, NULL);
|
(void)taosThreadJoin(pMgmt->notifyThread, NULL);
|
||||||
taosThreadClear(&pMgmt->notifyThread);
|
taosThreadClear(&pMgmt->notifyThread);
|
||||||
}
|
}
|
||||||
(void)tsem_destroy(&dmNotifyHdl.sem);
|
if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
|
||||||
|
dError("failed to destroy notify sem");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
#include "mmInt.h"
|
#include "mmInt.h"
|
||||||
|
|
||||||
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
|
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
|
||||||
(void)mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant);
|
if (mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant) != 0) {
|
||||||
|
dError("failed to get monitor info");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
||||||
|
|
|
@ -23,7 +23,7 @@ static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
.contLen = pMsg->info.rspLen,
|
.contLen = pMsg->info.rspLen,
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
};
|
};
|
||||||
(void)tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
|
|
|
@ -23,15 +23,15 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
.contLen = pMsg->info.rspLen,
|
.contLen = pMsg->info.rspLen,
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
};
|
};
|
||||||
(void)tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
(void)taosGetQitem(qall, (void **)&pMsg);
|
int32_t num = taosGetQitem(qall, (void **)&pMsg);
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
dTrace("msg:%p, get from snode-write queue", pMsg);
|
dTrace("msg:%p, get from snode-write queue", pMsg);
|
||||||
|
|
|
@ -35,10 +35,14 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
||||||
SVnodeObj *pVnode = *ppVnode;
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
SVnodeLoad vload = {.vgId = pVnode->vgId};
|
SVnodeLoad vload = {.vgId = pVnode->vgId};
|
||||||
if (!pVnode->failed) {
|
if (!pVnode->failed) {
|
||||||
(void)vnodeGetLoad(pVnode->pImpl, &vload);
|
if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
|
||||||
|
dError("failed to get vnode load");
|
||||||
|
}
|
||||||
if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
|
if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
|
||||||
}
|
}
|
||||||
(void)taosArrayPush(pInfo->pVloads, &vload);
|
if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
|
||||||
|
dError("failed to push vnode load");
|
||||||
|
}
|
||||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +120,9 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
|
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
|
||||||
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
|
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
|
||||||
|
|
||||||
(void)tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
|
if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
|
||||||
|
dError("failed to get tfs monitor info");
|
||||||
|
}
|
||||||
taosArrayDestroy(pVloads);
|
taosArrayDestroy(pVloads);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -845,7 +851,9 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
vmCloseVnode(pMgmt, pVnode, false);
|
vmCloseVnode(pMgmt, pVnode, false);
|
||||||
(void)vmWriteVnodeListToFile(pMgmt);
|
if (vmWriteVnodeListToFile(pMgmt) != 0) {
|
||||||
|
dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
dInfo("vgId:%d, is dropped", vgId);
|
dInfo("vgId:%d, is dropped", vgId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -233,8 +233,12 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
|
|
||||||
if (commitAndRemoveWal) {
|
if (commitAndRemoveWal) {
|
||||||
dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
|
dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
|
||||||
(void)vnodeSyncCommit(pVnode->pImpl);
|
if (vnodeSyncCommit(pVnode->pImpl) != 0) {
|
||||||
(void)vnodeBegin(pVnode->pImpl);
|
dError("vgId:%d, failed to commit data", pVnode->vgId);
|
||||||
|
}
|
||||||
|
if (vnodeBegin(pVnode->pImpl) != 0) {
|
||||||
|
dError("vgId:%d, failed to begin", pVnode->vgId);
|
||||||
|
}
|
||||||
dInfo("vgId:%d, commit data finished", pVnode->vgId);
|
dInfo("vgId:%d, commit data finished", pVnode->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,8 +252,12 @@ _closed:
|
||||||
if (commitAndRemoveWal) {
|
if (commitAndRemoveWal) {
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
|
||||||
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
|
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
|
||||||
(void)tfsRmdir(pMgmt->pTfs, path);
|
if (tfsRmdir(pMgmt->pTfs, path) != 0) {
|
||||||
(void)tfsMkdir(pMgmt->pTfs, path);
|
dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
|
||||||
|
}
|
||||||
|
if (tfsMkdir(pMgmt->pTfs, path) != 0) {
|
||||||
|
dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->dropped) {
|
if (pVnode->dropped) {
|
||||||
|
|
|
@ -187,7 +187,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
||||||
static void vmSendResponse(SRpcMsg *pMsg) {
|
static void vmSendResponse(SRpcMsg *pMsg) {
|
||||||
if (pMsg->info.handle) {
|
if (pMsg->info.handle) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
|
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
|
||||||
(void)rpcSendResponse(&rsp);
|
if (rpcSendResponse(&rsp) != 0) {
|
||||||
|
dError("failed to send response since %s", terrstr());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,10 +391,28 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||||
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||||
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
|
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
|
||||||
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
|
code = tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
|
||||||
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
|
if (code) {
|
||||||
(void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg);
|
return code;
|
||||||
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
|
}
|
||||||
|
code = tMultiWorkerInit(&pVnode->pSyncW, &scfg);
|
||||||
|
if (code) {
|
||||||
|
tMultiWorkerCleanup(&pVnode->pWriteW);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
code = tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg);
|
||||||
|
if (code) {
|
||||||
|
tMultiWorkerCleanup(&pVnode->pWriteW);
|
||||||
|
tMultiWorkerCleanup(&pVnode->pSyncW);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
code = tMultiWorkerInit(&pVnode->pApplyW, &acfg);
|
||||||
|
if (code) {
|
||||||
|
tMultiWorkerCleanup(&pVnode->pWriteW);
|
||||||
|
tMultiWorkerCleanup(&pVnode->pSyncW);
|
||||||
|
tMultiWorkerCleanup(&pVnode->pSyncRdW);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||||
|
|
|
@ -47,8 +47,14 @@ static int32_t dmCheckRepeatInit(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmInitSystem() {
|
static int32_t dmInitSystem() {
|
||||||
(void)taosIgnSIGPIPE();
|
if (taosIgnSIGPIPE() != 0) {
|
||||||
(void)taosBlockSIGPIPE();
|
dError("failed to ignore SIGPIPE");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosBlockSIGPIPE() != 0) {
|
||||||
|
dError("failed to block SIGPIPE");
|
||||||
|
}
|
||||||
|
|
||||||
taosResolveCRC();
|
taosResolveCRC();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -204,7 +210,9 @@ void dmCleanup() {
|
||||||
auditCleanup();
|
auditCleanup();
|
||||||
syncCleanUp();
|
syncCleanUp();
|
||||||
walCleanUp();
|
walCleanUp();
|
||||||
(void)udfcClose();
|
if (udfcClose() != 0) {
|
||||||
|
dError("failed to close udfc");
|
||||||
|
}
|
||||||
udfStopUdfd();
|
udfStopUdfd();
|
||||||
taosStopCacheRefreshWorker();
|
taosStopCacheRefreshWorker();
|
||||||
(void)dmDiskClose();
|
(void)dmDiskClose();
|
||||||
|
|
|
@ -47,8 +47,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// compress module init
|
// compress module init
|
||||||
(void)tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse,
|
tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
|
||||||
tsCompressor);
|
|
||||||
|
|
||||||
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
|
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
|
||||||
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
|
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
|
||||||
|
@ -226,7 +225,10 @@ void dmClearVars(SDnode *pDnode) {
|
||||||
(void)taosThreadRwlockDestroy(&pWrapper->lock);
|
(void)taosThreadRwlockDestroy(&pWrapper->lock);
|
||||||
}
|
}
|
||||||
if (pDnode->lockfile != NULL) {
|
if (pDnode->lockfile != NULL) {
|
||||||
(void)taosUnLockFile(pDnode->lockfile);
|
if (taosUnLockFile(pDnode->lockfile) != 0) {
|
||||||
|
dError("failed to unlock file");
|
||||||
|
}
|
||||||
|
|
||||||
(void)taosCloseFile(&pDnode->lockfile);
|
(void)taosCloseFile(&pDnode->lockfile);
|
||||||
pDnode->lockfile = NULL;
|
pDnode->lockfile = NULL;
|
||||||
}
|
}
|
||||||
|
@ -343,7 +345,9 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
rsp.contLen = pMsg->contLen;
|
rsp.contLen = pMsg->contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)rpcSendResponse(&rsp);
|
if (rpcSendResponse(&rsp) != 0) {
|
||||||
|
dError("failed to send response, msg:%p", &rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -360,11 +364,16 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
} else {
|
} else {
|
||||||
rsp.pCont = rpcMallocCont(contLen);
|
rsp.pCont = rpcMallocCont(contLen);
|
||||||
if (rsp.pCont != NULL) {
|
if (rsp.pCont != NULL) {
|
||||||
(void)tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp);
|
if (tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp) < 0) {
|
||||||
rsp.contLen = contLen;
|
rsp.code = TSDB_CODE_APP_ERROR;
|
||||||
|
} else {
|
||||||
|
rsp.contLen = contLen;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)rpcSendResponse(&rsp);
|
if (rpcSendResponse(&rsp) != 0) {
|
||||||
|
dError("failed to send response, msg:%p", &rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,11 @@
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
#include "tversion.h"
|
#include "tversion.h"
|
||||||
|
|
||||||
static inline void dmSendRsp(SRpcMsg *pMsg) { (void)rpcSendResponse(pMsg); }
|
static inline void dmSendRsp(SRpcMsg *pMsg) {
|
||||||
|
if (rpcSendResponse(pMsg) != 0) {
|
||||||
|
dError("failed to send response, msg:%p", pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
|
@ -113,7 +117,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
|
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
|
||||||
|
|
||||||
int32_t svrVer = 0;
|
int32_t svrVer = 0;
|
||||||
(void)taosVersionStrToInt(version, &svrVer);
|
code = taosVersionStrToInt(version, &svrVer);
|
||||||
|
if (code != 0) {
|
||||||
|
dError("failed to convert version string:%s to int, code:%d", version, code);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
|
if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
|
||||||
dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
|
dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
|
||||||
pRpc->info.conn.clientIp);
|
pRpc->info.conn.clientIp);
|
||||||
|
@ -253,7 +261,9 @@ _OVER:
|
||||||
if (pWrapper != NULL) {
|
if (pWrapper != NULL) {
|
||||||
dmSendRsp(&rsp);
|
dmSendRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
(void)rpcSendResponse(&rsp);
|
if (rpcSendResponse(&rsp) != 0) {
|
||||||
|
dError("failed to send response, msg:%p", &rsp);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,7 +320,9 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
pMsg->info.handle = 0;
|
pMsg->info.handle = 0;
|
||||||
(void)rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
|
if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
|
||||||
|
dError("failed to send rpc msg");
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -396,7 +408,9 @@ int32_t dmInitClient(SDnode *pDnode) {
|
||||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
rpcInit.notWaitAvaliableConn = 0;
|
rpcInit.notWaitAvaliableConn = 0;
|
||||||
|
|
||||||
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
|
||||||
|
dError("failed to convert version string:%s to int", version);
|
||||||
|
}
|
||||||
|
|
||||||
pTrans->clientRpc = rpcOpen(&rpcInit);
|
pTrans->clientRpc = rpcOpen(&rpcInit);
|
||||||
if (pTrans->clientRpc == NULL) {
|
if (pTrans->clientRpc == NULL) {
|
||||||
|
@ -440,7 +454,10 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
|
||||||
rpcInit.supportBatch = 1;
|
rpcInit.supportBatch = 1;
|
||||||
rpcInit.batchSize = 8 * 1024;
|
rpcInit.batchSize = 8 * 1024;
|
||||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
|
||||||
|
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
|
||||||
|
dError("failed to convert version string:%s to int", version);
|
||||||
|
}
|
||||||
|
|
||||||
pTrans->statusRpc = rpcOpen(&rpcInit);
|
pTrans->statusRpc = rpcOpen(&rpcInit);
|
||||||
if (pTrans->statusRpc == NULL) {
|
if (pTrans->statusRpc == NULL) {
|
||||||
|
@ -485,7 +502,9 @@ int32_t dmInitSyncClient(SDnode *pDnode) {
|
||||||
rpcInit.supportBatch = 1;
|
rpcInit.supportBatch = 1;
|
||||||
rpcInit.batchSize = 8 * 1024;
|
rpcInit.batchSize = 8 * 1024;
|
||||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
|
||||||
|
dError("failed to convert version string:%s to int", version);
|
||||||
|
}
|
||||||
|
|
||||||
pTrans->syncRpc = rpcOpen(&rpcInit);
|
pTrans->syncRpc = rpcOpen(&rpcInit);
|
||||||
if (pTrans->syncRpc == NULL) {
|
if (pTrans->syncRpc == NULL) {
|
||||||
|
@ -536,7 +555,11 @@ int32_t dmInitServer(SDnode *pDnode) {
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.parent = pDnode;
|
rpcInit.parent = pDnode;
|
||||||
rpcInit.compressSize = tsCompressMsgSize;
|
rpcInit.compressSize = tsCompressMsgSize;
|
||||||
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
|
||||||
|
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
|
||||||
|
dError("failed to convert version string:%s to int", version);
|
||||||
|
}
|
||||||
|
|
||||||
pTrans->serverRpc = rpcOpen(&rpcInit);
|
pTrans->serverRpc = rpcOpen(&rpcInit);
|
||||||
if (pTrans->serverRpc == NULL) {
|
if (pTrans->serverRpc == NULL) {
|
||||||
dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
|
dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
|
||||||
|
|
|
@ -259,7 +259,9 @@ _OVER:
|
||||||
if (taosArrayGetSize(pData->dnodeEps) == 0) {
|
if (taosArrayGetSize(pData->dnodeEps) == 0) {
|
||||||
SDnodeEp dnodeEp = {0};
|
SDnodeEp dnodeEp = {0};
|
||||||
dnodeEp.isMnode = 1;
|
dnodeEp.isMnode = 1;
|
||||||
(void)taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
|
if (taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep) != 0) {
|
||||||
|
dError("failed to get fqdn and port from ep:%s", tsFirst);
|
||||||
|
}
|
||||||
if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) {
|
if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -370,11 +372,19 @@ int32_t dmGetDnodeSize(SDnodeData *pData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
|
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
|
||||||
(void)taosThreadRwlockWrlock(&pData->lock);
|
if (taosThreadRwlockWrlock(&pData->lock) != 0) {
|
||||||
|
dError("failed to lock dnode lock");
|
||||||
|
}
|
||||||
|
|
||||||
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
|
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
|
||||||
dmResetEps(pData, eps);
|
dmResetEps(pData, eps);
|
||||||
(void)dmWriteEps(pData);
|
if (dmWriteEps(pData) != 0) {
|
||||||
(void)taosThreadRwlockUnlock(&pData->lock);
|
dError("failed to write dnode file");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosThreadRwlockUnlock(&pData->lock) != 0) {
|
||||||
|
dError("failed to unlock dnode lock");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
|
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
|
||||||
|
@ -590,7 +600,9 @@ void dmRemoveDnodePairs(SDnodeData *pData) {
|
||||||
snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
|
snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
|
||||||
snprintf(bak, sizeof(bak), "%s%sdnode%sep.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
|
snprintf(bak, sizeof(bak), "%s%sdnode%sep.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
|
||||||
dInfo("dnode file:%s is rename to bak file", file);
|
dInfo("dnode file:%s is rename to bak file", file);
|
||||||
(void)taosRenameFile(file, bak);
|
if (taosRenameFile(file, bak) != 0) {
|
||||||
|
dError("failed to rename dnode file:%s to bak file:%s since %s", file, bak, tstrerror(terrno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmReadDnodePairs(SDnodeData *pData) {
|
static int32_t dmReadDnodePairs(SDnodeData *pData) {
|
||||||
|
|
|
@ -137,8 +137,12 @@ int32_t indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, END);
|
TAOS_CHECK_GOTO(terrno, NULL, END);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexInit(&idx->mtx, NULL));
|
if (taosThreadMutexInit(&idx->mtx, NULL) != 0) {
|
||||||
TAOS_UNUSED(tsem_init(&idx->sem, 0, 0));
|
TAOS_CHECK_GOTO(terrno, NULL, END);
|
||||||
|
}
|
||||||
|
if (tsem_init(&idx->sem, 0, 0) != 0) {
|
||||||
|
TAOS_CHECK_GOTO(terrno, NULL, END);
|
||||||
|
}
|
||||||
|
|
||||||
idx->refId = idxAddRef(idx);
|
idx->refId = idxAddRef(idx);
|
||||||
idx->opts = *opts;
|
idx->opts = *opts;
|
||||||
|
@ -213,7 +217,10 @@ void idxReleaseRef(int64_t ref) {
|
||||||
int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
// TODO(yihao): reduce the lock range
|
// TODO(yihao): reduce the lock range
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&index->mtx));
|
if (taosThreadMutexLock(&index->mtx) != 0) {
|
||||||
|
indexError("failed to lock index mutex");
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
|
|
||||||
|
@ -231,7 +238,9 @@ int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&index->mtx));
|
if (taosThreadMutexUnlock(&index->mtx) != 0) {
|
||||||
|
indexError("failed to unlock index mutex");
|
||||||
|
}
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -463,7 +472,10 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu
|
||||||
|
|
||||||
int32_t sz = idxSerialCacheKey(&key, buf);
|
int32_t sz = idxSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&sIdx->mtx));
|
if (taosThreadMutexLock(&sIdx->mtx) != 0) {
|
||||||
|
indexError("failed to lock index mutex");
|
||||||
|
}
|
||||||
|
|
||||||
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
||||||
cache = (pCache == NULL) ? NULL : *pCache;
|
cache = (pCache == NULL) ? NULL : *pCache;
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&sIdx->mtx));
|
TAOS_UNUSED(taosThreadMutexUnlock(&sIdx->mtx));
|
||||||
|
@ -757,7 +769,9 @@ static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
|
||||||
|
|
||||||
IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
|
IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&tf->mtx));
|
if (taosThreadMutexLock(&tf->mtx) != 0) {
|
||||||
|
indexError("failed to lock tfile mutex");
|
||||||
|
}
|
||||||
TFileReader* rd = tfileCacheGet(tf->cache, &key);
|
TFileReader* rd = tfileCacheGet(tf->cache, &key);
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&tf->mtx));
|
TAOS_UNUSED(taosThreadMutexUnlock(&tf->mtx));
|
||||||
|
|
||||||
|
@ -801,9 +815,15 @@ static int32_t idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&tf->mtx));
|
if (taosThreadMutexLock(&tf->mtx) != 0) {
|
||||||
|
indexError("failed to lock tfile mutex");
|
||||||
|
}
|
||||||
|
|
||||||
code = tfileCachePut(tf->cache, &key, reader);
|
code = tfileCachePut(tf->cache, &key, reader);
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&tf->mtx));
|
|
||||||
|
if (taosThreadMutexUnlock(&tf->mtx) != 0) {
|
||||||
|
indexError("failed to unlock tfile mutex");
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
|
|
@ -398,8 +398,17 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
|
||||||
cache->suid = suid;
|
cache->suid = suid;
|
||||||
cache->occupiedMem = 0;
|
cache->occupiedMem = 0;
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexInit(&cache->mtx, NULL));
|
if (taosThreadMutexInit(&cache->mtx, NULL) != 0) {
|
||||||
TAOS_UNUSED(taosThreadCondInit(&cache->finished, NULL));
|
indexError("failed to create mutex for index cache");
|
||||||
|
taosMemoryFree(cache);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosThreadCondInit(&cache->finished, NULL) != 0) {
|
||||||
|
indexError("failed to create cond for index cache");
|
||||||
|
taosMemoryFree(cache);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
idxCacheRef(cache);
|
idxCacheRef(cache);
|
||||||
if (idx != NULL) {
|
if (idx != NULL) {
|
||||||
|
@ -410,10 +419,16 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
|
||||||
void idxCacheDebug(IndexCache* cache) {
|
void idxCacheDebug(IndexCache* cache) {
|
||||||
MemTable* tbl = NULL;
|
MemTable* tbl = NULL;
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
|
if ((taosThreadMutexLock(&cache->mtx)) != 0) {
|
||||||
|
indexError("failed to lock cache mutex");
|
||||||
|
}
|
||||||
|
|
||||||
tbl = cache->mem;
|
tbl = cache->mem;
|
||||||
idxMemRef(tbl);
|
idxMemRef(tbl);
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&cache->mtx));
|
|
||||||
|
if (taosThreadMutexUnlock(&cache->mtx) != 0) {
|
||||||
|
indexError("failed to unlock cache mutex");
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SSkipList* slt = tbl->mem;
|
SSkipList* slt = tbl->mem;
|
||||||
|
@ -432,7 +447,9 @@ void idxCacheDebug(IndexCache* cache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
|
if (taosThreadMutexLock(&cache->mtx) != 0) {
|
||||||
|
indexError("failed to lock cache mutex");
|
||||||
|
}
|
||||||
tbl = cache->imm;
|
tbl = cache->imm;
|
||||||
idxMemRef(tbl);
|
idxMemRef(tbl);
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&cache->mtx));
|
TAOS_UNUSED(taosThreadMutexUnlock(&cache->mtx));
|
||||||
|
@ -480,7 +497,9 @@ void idxCacheDestroyImm(IndexCache* cache) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
MemTable* tbl = NULL;
|
MemTable* tbl = NULL;
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
|
if (taosThreadMutexLock(&cache->mtx) != 0) {
|
||||||
|
indexError("failed to lock cache mutex");
|
||||||
|
}
|
||||||
|
|
||||||
tbl = cache->imm;
|
tbl = cache->imm;
|
||||||
cache->imm = NULL; // or throw int bg thread
|
cache->imm = NULL; // or throw int bg thread
|
||||||
|
@ -517,7 +536,11 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
|
||||||
if (iter == NULL) {
|
if (iter == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
|
if (taosThreadMutexLock(&cache->mtx) != 0) {
|
||||||
|
indexError("failed to lock cache mutex");
|
||||||
|
taosMemoryFree(iter);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
idxMemRef(cache->imm);
|
idxMemRef(cache->imm);
|
||||||
|
|
||||||
|
@ -615,7 +638,9 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
|
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&pCache->mtx));
|
if (taosThreadMutexLock(&pCache->mtx) != 0) {
|
||||||
|
indexError("failed to lock cache mutex");
|
||||||
|
}
|
||||||
pCache->occupiedMem += estimate;
|
pCache->occupiedMem += estimate;
|
||||||
idxCacheMakeRoomForWrite(pCache);
|
idxCacheMakeRoomForWrite(pCache);
|
||||||
MemTable* tbl = pCache->mem;
|
MemTable* tbl = pCache->mem;
|
||||||
|
@ -623,7 +648,9 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
TAOS_UNUSED(tSkipListPut(tbl->mem, (char*)ct));
|
TAOS_UNUSED(tSkipListPut(tbl->mem, (char*)ct));
|
||||||
idxMemUnRef(tbl);
|
idxMemUnRef(tbl);
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&pCache->mtx));
|
if (taosThreadMutexUnlock(&pCache->mtx) != 0) {
|
||||||
|
indexError("failed to unlock cache mutex");
|
||||||
|
}
|
||||||
idxCacheUnRef(pCache);
|
idxCacheUnRef(pCache);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -631,13 +658,17 @@ void idxCacheForceToMerge(void* cache) {
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
|
|
||||||
idxCacheRef(pCache);
|
idxCacheRef(pCache);
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&pCache->mtx));
|
if (taosThreadMutexLock(&pCache->mtx) != 0) {
|
||||||
|
indexError("failed to lock cache mutex");
|
||||||
|
}
|
||||||
|
|
||||||
indexInfo("%p is forced to merge into tfile", pCache);
|
indexInfo("%p is forced to merge into tfile", pCache);
|
||||||
pCache->occupiedMem += MEM_SIGNAL_QUIT;
|
pCache->occupiedMem += MEM_SIGNAL_QUIT;
|
||||||
idxCacheMakeRoomForWrite(pCache);
|
idxCacheMakeRoomForWrite(pCache);
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&pCache->mtx));
|
if (taosThreadMutexUnlock(&pCache->mtx) != 0) {
|
||||||
|
indexError("failed to unlock cache mutex");
|
||||||
|
}
|
||||||
idxCacheUnRef(pCache);
|
idxCacheUnRef(pCache);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -668,12 +699,16 @@ int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STerm
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
|
|
||||||
MemTable *mem = NULL, *imm = NULL;
|
MemTable *mem = NULL, *imm = NULL;
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&pCache->mtx));
|
if (taosThreadMutexLock(&pCache->mtx) != 0) {
|
||||||
|
indexError("failed to lock cache mutex");
|
||||||
|
}
|
||||||
mem = pCache->mem;
|
mem = pCache->mem;
|
||||||
imm = pCache->imm;
|
imm = pCache->imm;
|
||||||
idxMemRef(mem);
|
idxMemRef(mem);
|
||||||
idxMemRef(imm);
|
idxMemRef(imm);
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&pCache->mtx));
|
if (taosThreadMutexUnlock(&pCache->mtx) != 0) {
|
||||||
|
indexError("failed to unlock cache mutex");
|
||||||
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
|
|
@ -994,7 +994,9 @@ Fst* fstCreate(FstSlice* slice) {
|
||||||
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1);
|
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1);
|
||||||
fst->data = s;
|
fst->data = s;
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexInit(&fst->mtx, NULL));
|
if (taosThreadMutexInit(&fst->mtx, NULL) != 0) {
|
||||||
|
goto FST_CREAT_FAILED;
|
||||||
|
}
|
||||||
return fst;
|
return fst;
|
||||||
|
|
||||||
FST_CREAT_FAILED:
|
FST_CREAT_FAILED:
|
||||||
|
|
|
@ -739,7 +739,11 @@ IndexTFile* idxTFileCreate(SIndex* idx, const char* path) {
|
||||||
tfileCacheDestroy(cache);
|
tfileCacheDestroy(cache);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
TAOS_UNUSED(taosThreadMutexInit(&tfile->mtx, NULL));
|
if (taosThreadMutexInit(&tfile->mtx, NULL) != 0) {
|
||||||
|
taosMemoryFree(tfile);
|
||||||
|
tfileCacheDestroy(cache);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
tfile->cache = cache;
|
tfile->cache = cache;
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
|
@ -764,9 +768,16 @@ int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&pTfile->mtx));
|
if (taosThreadMutexLock(&pTfile->mtx) != 0) {
|
||||||
|
indexError("failed to lock tfile mutex");
|
||||||
|
}
|
||||||
|
|
||||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&pTfile->mtx));
|
|
||||||
|
if (taosThreadMutexUnlock(&pTfile->mtx) != 0) {
|
||||||
|
indexError("failed to unlock tfile mutex");
|
||||||
|
}
|
||||||
|
|
||||||
if (reader == NULL) {
|
if (reader == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -883,9 +894,13 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
|
||||||
TFileReader* rd = NULL;
|
TFileReader* rd = NULL;
|
||||||
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||||
|
|
||||||
TAOS_UNUSED(taosThreadMutexLock(&tf->mtx));
|
if (taosThreadMutexLock(&tf->mtx) != 0) {
|
||||||
|
indexError("failed to lock tfile mutex");
|
||||||
|
}
|
||||||
rd = tfileCacheGet(tf->cache, &key);
|
rd = tfileCacheGet(tf->cache, &key);
|
||||||
TAOS_UNUSED(taosThreadMutexUnlock(&tf->mtx));
|
if (taosThreadMutexUnlock(&tf->mtx) != 0) {
|
||||||
|
indexError("failed to unlock tfile mutex");
|
||||||
|
}
|
||||||
return rd;
|
return rd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -324,8 +324,8 @@ bool lossyFloat = false;
|
||||||
bool lossyDouble = false;
|
bool lossyDouble = false;
|
||||||
|
|
||||||
// init call
|
// init call
|
||||||
int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
|
void tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals,
|
||||||
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
|
int32_t ifAdtFse, const char *compressor) {
|
||||||
// config
|
// config
|
||||||
lossyFloat = strstr(lossyColumns, "float") != NULL;
|
lossyFloat = strstr(lossyColumns, "float") != NULL;
|
||||||
lossyDouble = strstr(lossyColumns, "double") != NULL;
|
lossyDouble = strstr(lossyColumns, "double") != NULL;
|
||||||
|
@ -333,7 +333,7 @@ int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision,
|
||||||
tdszInit(fPrecision, dPrecision, maxIntervals, intervals, ifAdtFse, compressor);
|
tdszInit(fPrecision, dPrecision, maxIntervals, intervals, ifAdtFse, compressor);
|
||||||
if (lossyFloat) uTrace("lossy compression float is opened. ");
|
if (lossyFloat) uTrace("lossy compression float is opened. ");
|
||||||
if (lossyDouble) uTrace("lossy compression double is opened. ");
|
if (lossyDouble) uTrace("lossy compression double is opened. ");
|
||||||
return 0;
|
return;
|
||||||
}
|
}
|
||||||
// exit call
|
// exit call
|
||||||
void tsCompressExit() { tdszExit(); }
|
void tsCompressExit() { tdszExit(); }
|
||||||
|
|
Loading…
Reference in New Issue