diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in
index 6cdff79629..7a12413377 100644
--- a/cmake/taosadapter_CMakeLists.txt.in
+++ b/cmake/taosadapter_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
- GIT_TAG 88d26c3
+ GIT_TAG 766dcc4
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in
index f90a163cb4..bc501b6dbc 100644
--- a/cmake/taostools_CMakeLists.txt.in
+++ b/cmake/taostools_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
- GIT_TAG 58f58ee
+ GIT_TAG 8157e3b
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index dcd35f10f7..bfb80ec8f8 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -227,8 +227,7 @@ typedef struct SSubmitBlk {
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
- int16_t numOfRows; // total number of rows in current submit block
- int16_t padding; // TODO just for padding here
+ int32_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
@@ -256,7 +255,7 @@ typedef struct {
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
- int16_t numOfRows; // total number of rows in current submit block
+ int32_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk
int32_t numOfBlocks;
const void* pMsg;
diff --git a/include/os/osEnv.h b/include/os/osEnv.h
index a3f92a0b29..798bfc197e 100644
--- a/include/os/osEnv.h
+++ b/include/os/osEnv.h
@@ -49,6 +49,8 @@ void osDefaultInit();
void osUpdate();
void osCleanup();
bool osLogSpaceAvailable();
+bool osDataSpaceAvailable();
+bool osTempSpaceAvailable();
void osSetTimezone(const char *timezone);
void osSetSystemLocale(const char *inLocale, const char *inCharSet);
diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c
index c2d621b311..6785390952 100644
--- a/source/client/src/clientMain.c
+++ b/source/client/src/clientMain.c
@@ -76,7 +76,7 @@ void taos_cleanup(void) {
cleanupTaskQueue();
taosConvDestroy();
-
+
tscInfo("all local resources released");
taosCleanupCfg();
taosCloseLog();
@@ -680,7 +680,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery;
if (pQuery->pRoot) {
- pRequest->stmtType = pQuery->pRoot->type;
+ pRequest->stmtType = pQuery->pRoot->type;
}
}
@@ -785,9 +785,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
if (NULL == pQuery->pRoot) {
- atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
+ atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
- atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
+ atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
}
}
@@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob);
+ pCxt = NULL;
if (code == TSDB_CODE_SUCCESS) {
return;
}
@@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
_error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
+ taosMemoryFree(pCxt);
+
terrno = code;
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
@@ -857,7 +860,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
- atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
+ atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
}
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c
index 994cf9fdc1..c68c3fad95 100644
--- a/source/client/src/tmq.c
+++ b/source/client/src/tmq.c
@@ -3144,10 +3144,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
- blk->padding = htonl(blk->padding);
blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen);
- blk->numOfRows = htons(rows);
+ blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1;
@@ -3373,10 +3372,9 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
- blk->padding = htonl(blk->padding);
blk->sversion = htonl(pSW->version);
blk->schemaLen = htonl(schemaLen);
- blk->numOfRows = htons(rows);
+ blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks++;
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index 8a240018fa..51c21eafa9 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -2028,11 +2028,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
- blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
- blk->numOfRows = htons(blk->numOfRows);
+ blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
} else {
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index efadcad350..8dd8a29c86 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -452,7 +452,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "logDir");
tstrncpy(tsLogDir, cfgGetItem(pCfg, "logDir")->str, PATH_MAX);
taosExpandDir(tsLogDir, tsLogDir, PATH_MAX);
- tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
+ tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024);
tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32;
tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval;
tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32;
@@ -502,7 +502,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
- tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
+ tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1;
@@ -540,7 +540,7 @@ static void taosSetSystemCfg(SConfig *pCfg) {
}
static int32_t taosSetServerCfg(SConfig *pCfg) {
- tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
+ tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024);
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
@@ -739,15 +739,15 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
}
case 'i': {
if (strcasecmp("minimalTmpDirGB", name) == 0) {
- tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
+ tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
- tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
+ tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024);
} else if (strcasecmp("minSlidingTime", name) == 0) {
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
} else if (strcasecmp("minIntervalTime", name) == 0) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
- tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
+ tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024);
}
break;
}
diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index 05b27546eb..fb9c64a880 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -76,7 +76,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
pIter->sversion = htonl((*pPBlock)->sversion);
pIter->dataLen = htonl((*pPBlock)->dataLen);
pIter->schemaLen = htonl((*pPBlock)->schemaLen);
- pIter->numOfRows = htons((*pPBlock)->numOfRows);
+ pIter->numOfRows = htonl((*pPBlock)->numOfRows);
}
return 0;
}
diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c
index 03353b0de6..c86bf08e81 100644
--- a/source/common/src/ttszip.c
+++ b/source/common/src/ttszip.c
@@ -30,6 +30,12 @@ static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
* @return
*/
STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_TSC_NO_DISKSPACE;
+ // tscError("tmp file created failed since %s", terrstr());
+ return NULL;
+ }
+
STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) {
return NULL;
diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
index e4d6de849c..1a226abe5c 100644
--- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
@@ -176,7 +176,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case WRITE_QUEUE:
- if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
+ if (!osDataSpaceAvailable()) {
+ terrno = TSDB_CODE_VND_NO_DISKSPACE;
+ code = terrno;
+ dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());
+ } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());
diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c
index 119d521827..5d72ce3b18 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c
@@ -49,8 +49,26 @@ static int32_t dmInitMonitor() {
return 0;
}
+static bool dmCheckDiskSpace() {
+ osUpdate();
+ if (!osDataSpaceAvailable()) {
+ dError("free disk size: %f GB, too little, require %f GB at least at least , quit", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
+ return false;
+ }
+ if (!osLogSpaceAvailable()) {
+ dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
+ return false;
+ }
+ if (!osTempSpaceAvailable()) {
+ dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
+ return false;
+ }
+ return true;
+}
+
int32_t dmInit(int8_t rtype) {
dInfo("start to init dnode env");
+ if (!dmCheckDiskSpace()) return -1;
if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
if (dmInitSystem() != 0) return -1;
if (dmInitMonitor() != 0) return -1;
diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c
index ecbb695e02..99ffd73a7a 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c
@@ -265,6 +265,7 @@ static void dmWatchNodes(SDnode *pDnode) {
}
int32_t dmRunDnode(SDnode *pDnode) {
+ int count = 0;
if (dmOpenNodes(pDnode) != 0) {
dError("failed to open nodes since %s", terrstr());
return -1;
@@ -274,7 +275,6 @@ int32_t dmRunDnode(SDnode *pDnode) {
dError("failed to start nodes since %s", terrstr());
return -1;
}
-
while (1) {
if (pDnode->stop) {
dInfo("TDengine is about to stop");
@@ -285,6 +285,9 @@ int32_t dmRunDnode(SDnode *pDnode) {
}
dmWatchNodes(pDnode);
+ if (count == 0) osUpdate();
+ count %= 10;
+ count++;
taosMsleep(100);
}
}
diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c
index 810dcb9049..b8ff7be46a 100644
--- a/source/dnode/mnode/impl/src/mndSync.c
+++ b/source/dnode/mnode/impl/src/mndSync.c
@@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
- pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
+ pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
pSnapshot->lastConfigIndex);
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 850394b15d..6b4d5a2f3e 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -117,7 +117,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
- blkHead->numOfRows = htons(pDataBlock->info.rows);
+ blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid);
diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c
index a221bc1795..383652531e 100644
--- a/source/dnode/vnode/src/tsdb/tsdbWrite.c
+++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c
@@ -111,7 +111,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
// pBlock->sversion = htonl(pBlock->sversion);
// pBlock->dataLen = htonl(pBlock->dataLen);
// pBlock->schemaLen = htonl(pBlock->schemaLen);
- // pBlock->numOfRows = htons(pBlock->numOfRows);
+ // pBlock->numOfRows = htonl(pBlock->numOfRows);
#if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index f3550bf2ee..52698d0be7 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
- pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
+ pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp
index 2a8c7a583c..33623f1bdd 100644
--- a/source/dnode/vnode/test/tsdbSmaTest.cpp
+++ b/source/dnode/vnode/test/tsdbSmaTest.cpp
@@ -395,9 +395,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pBlk->uid = htobe64(tbUid);
pBlk->suid = htobe64(tbUid);
pBlk->sversion = htonl(schemaVer);
- pBlk->padding = htonl(0);
pBlk->schemaLen = htonl(0);
- pBlk->numOfRows = htons(mockRowNum);
+ pBlk->numOfRows = htonl(mockRowNum);
pBlk->dataLen = htonl(mockRowNum * mockRowLen);
for (uint32_t r = 0; r < mockRowNum; ++r) {
pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen);
diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c
index 0f97b5c5b1..4652c66ebc 100644
--- a/source/libs/catalog/src/ctgRemote.c
+++ b/source/libs/catalog/src/ctgRemote.c
@@ -13,24 +13,25 @@
* along with this program. If not, see .
*/
-#include "trpc.h"
-#include "query.h"
-#include "tname.h"
#include "catalogInt.h"
+#include "query.h"
#include "systable.h"
+#include "tname.h"
#include "tref.h"
+#include "trpc.h"
-int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) {
- int32_t code = 0;
- SArray* pTaskId = cbParam->taskId;
+int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
+ int32_t code = 0;
+ SArray* pTaskId = cbParam->taskId;
SCatalog* pCtg = pJob->pCtg;
- int32_t taskNum = taosArrayGetSize(pTaskId);
- SDataBuf taskMsg = *pMsg;
- int32_t offset = 0;
+ int32_t taskNum = taosArrayGetSize(pTaskId);
+ SDataBuf taskMsg = *pMsg;
+ int32_t offset = 0;
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
ASSERT(taskNum == msgNum || 0 == msgNum);
- ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1));
+ ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
+ TMSG_INFO(cbParam->reqType + 1));
offset += sizeof(msgNum);
SBatchRsp rsp = {0};
@@ -39,10 +40,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
ctgError("taosHashInit %d batch failed", taskNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
-
+
for (int32_t i = 0; i < taskNum; ++i) {
- int32_t* taskId = taosArrayGet(pTaskId, i);
- SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
+ int32_t* taskId = taosArrayGet(pTaskId, i);
+ SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
if (msgNum > 0) {
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.reqType);
@@ -52,7 +53,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
offset += sizeof(rsp.rspCode);
rsp.msg = ((char*)pMsg->pData) + offset;
offset += rsp.msgLen;
-
+
taskMsg.msgType = rsp.reqType;
taskMsg.pData = rsp.msg;
taskMsg.len = rsp.msgLen;
@@ -64,9 +65,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
}
pTask->pBatchs = pBatchs;
-
- ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1));
-
+
+ ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
+ TMSG_INFO(taskMsg.msgType + 1));
+
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
}
@@ -78,23 +80,22 @@ _return:
CTG_RET(code);
}
-
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
int32_t code = 0;
-
+
switch (reqType) {
case TDMT_MND_QNODE_LIST: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode));
CTG_ERR_RET(code);
}
-
+
qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
break;
}
@@ -103,13 +104,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
CTG_ERR_RET(code);
}
-
+
qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
break;
}
@@ -118,13 +119,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got db vgInfo from mnode, dbFName:%s", target);
break;
}
@@ -133,13 +134,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got db cfg from mnode, dbFName:%s", target);
break;
}
@@ -148,13 +149,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got index from mnode, indexName:%s", target);
break;
}
@@ -163,13 +164,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got table index from mnode, tbFName:%s", target);
break;
}
@@ -178,13 +179,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got udf from mnode, funcName:%s", target);
break;
}
@@ -193,13 +194,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got user auth from mnode, user:%s", target);
break;
}
@@ -210,17 +211,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("stablemeta not exist in mnode, tbFName:%s", target);
return TSDB_CODE_SUCCESS;
}
-
+
qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got table meta from mnode, tbFName:%s", target);
break;
}
@@ -231,17 +232,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("tablemeta not exist in vnode, tbFName:%s", target);
return TSDB_CODE_SUCCESS;
}
-
+
qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got table meta from vnode, tbFName:%s", target);
break;
}
@@ -250,13 +251,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got table cfg from vnode, tbFName:%s", target);
break;
}
@@ -265,28 +266,28 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
-
+
qDebug("Got stb cfg from mnode, tbFName:%s", target);
break;
- }
+ }
case TDMT_MND_SERVER_VERSION: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
-
+
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process svr ver rsp failed, error:%s", tstrerror(code));
CTG_ERR_RET(code);
}
-
+
qDebug("Got svr ver from mnode");
break;
}
@@ -295,7 +296,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("Got error rsp, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
-
+
qError("invalid req type %s", TMSG_INFO(reqType));
return TSDB_CODE_APP_ERROR;
}
@@ -303,12 +304,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
return TSDB_CODE_SUCCESS;
}
-
-int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
+int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
- int32_t code = 0;
- SCtgJob* pJob = NULL;
-
+ int32_t code = 0;
+ SCtgJob* pJob = NULL;
+
CTG_API_JENTER();
pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
@@ -322,13 +322,15 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
} else {
- int32_t *taskId = taosArrayGet(cbParam->taskId, 0);
- SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
+ int32_t* taskId = taosArrayGet(cbParam->taskId, 0);
+ SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
- qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
+ qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
+ TMSG_INFO(cbParam->reqType + 1));
#if CTG_BATCH_FETCH
- SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
+ SHashObj* pBatchs =
+ taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pBatchs) {
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
@@ -339,10 +341,10 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode));
#if CTG_BATCH_FETCH
- CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
-#endif
+ CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
+#endif
}
-
+
_return:
taosMemoryFree(pMsg->pData);
@@ -354,16 +356,16 @@ _return:
CTG_API_LEAVE(code);
}
-
-int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
+int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType,
+ SMsgSendInfo** pMsgSendInfo) {
int32_t code = 0;
- SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
+ SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
- SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
+ SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
if (NULL == param) {
qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@@ -391,10 +393,10 @@ _return:
CTG_RET(code);
}
-int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId,
- int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) {
- int32_t code = 0;
- SMsgSendInfo *pMsgSendInfo = NULL;
+int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
+ char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
+ int32_t code = 0;
+ SMsgSendInfo* pMsgSendInfo = NULL;
CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo));
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
@@ -426,22 +428,23 @@ _return:
CTG_RET(code);
}
-int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) {
- int32_t code = 0;
- SHashObj* pBatchs = pTask->pBatchs;
- SCtgJob* pJob = pTask->pJob;
+int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTask* pTask, int32_t msgType, void* msg,
+ uint32_t msgSize) {
+ int32_t code = 0;
+ SHashObj* pBatchs = pTask->pBatchs;
+ SCtgJob* pJob = pTask->pJob;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
- int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks);
- SCtgBatch newBatch = {0};
- SBatchMsg req = {0};
-
+ int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks);
+ SCtgBatch newBatch = {0};
+ SBatchMsg req = {0};
+
if (NULL == pBatch) {
newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg));
newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t));
if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
-
+
newBatch.conn = *pConn;
req.msgType = msgType;
@@ -475,7 +478,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
- ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId);
+ ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
+ vgId);
return TSDB_CODE_SUCCESS;
}
@@ -504,7 +508,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
}
}
- ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId);
+ ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
+ vgId);
return TSDB_CODE_SUCCESS;
@@ -512,24 +517,24 @@ _return:
ctgFreeBatch(&newBatch);
taosMemoryFree(msg);
-
+
return code;
}
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
- *msg = taosMemoryMalloc(pBatch->msgSize);
+ *msg = taosMemoryCalloc(1, pBatch->msgSize);
if (NULL == (*msg)) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
- int32_t offset = 0;
- int32_t num = taosArrayGetSize(pBatch->pMsgs);
- SBatchReq *pBatchReq = (SBatchReq*)(*msg);
+ int32_t offset = 0;
+ int32_t num = taosArrayGetSize(pBatch->pMsgs);
+ SBatchReq* pBatchReq = (SBatchReq*)(*msg);
pBatchReq->header.vgId = htonl(vgId);
pBatchReq->msgNum = htonl(num);
offset += sizeof(SBatchReq);
-
+
for (int32_t i = 0; i < num; ++i) {
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
@@ -547,23 +552,23 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
return TSDB_CODE_SUCCESS;
}
-int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) {
+int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
int32_t code = 0;
- void* msg = NULL;
- void* p = taosHashIterate(pBatchs, NULL);
+ void* msg = NULL;
+ void* p = taosHashIterate(pBatchs, NULL);
while (NULL != p) {
- size_t len = 0;
- int32_t* vgId = taosHashGetKey(p, &len);
+ size_t len = 0;
+ int32_t* vgId = taosHashGetKey(p, &len);
SCtgBatch* pBatch = (SCtgBatch*)p;
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
-
+
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
- code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId,
- pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
+ code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId,
+ pBatch->msgType, msg, pBatch->msgSize);
pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code);
-
+
p = taosHashIterate(pBatchs, p);
}
@@ -575,16 +580,15 @@ _return:
taosHashCancelIterate(pBatchs, p);
}
taosMemoryFree(msg);
-
+
CTG_RET(code);
}
-
-int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
@@ -609,14 +613,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -630,11 +634,11 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
@@ -655,14 +659,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -676,12 +680,12 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS;
}
-
-int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
+ SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
@@ -706,14 +710,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -721,21 +725,22 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
-
+
rpcFreeCont(rpcRsp.pCont);
-
+
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
+ SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
- int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp);
+ int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
CTG_ERR_RET(code);
@@ -756,14 +761,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GET_DB_CFG,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -777,15 +782,16 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
+ SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get index from mnode, indexName:%s", indexName);
- int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp);
+ int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
CTG_ERR_RET(code);
@@ -800,20 +806,20 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
-#else
+#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -823,21 +829,22 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
rpcFreeCont(rpcRsp.pCont);
-
+
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
+ SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName);
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
- int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp);
+ int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
CTG_ERR_RET(code);
@@ -848,25 +855,25 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
-
+
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
-#else
+#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -876,19 +883,20 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont);
-
+
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
+ SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
- int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp);
+ int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
CTG_ERR_RET(code);
@@ -909,14 +917,14 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -930,15 +938,16 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
+ SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get user auth from mnode, user:%s", user);
- int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp);
+ int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
CTG_ERR_RET(code);
@@ -953,20 +962,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
-#else
+#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -976,20 +985,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
rpcFreeCont(rpcRsp.pCont);
-
+
return TSDB_CODE_SUCCESS;
}
-
-int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) {
+int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
+ STableMetaOutput* out, SCtgTask* pTask) {
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
- char *msg = NULL;
- SEpSet *pVnodeEpSet = NULL;
- int32_t msgLen = 0;
- int32_t reqType = TDMT_MND_TABLE_META;
- char tbFName[TSDB_TABLE_FNAME_LEN];
+ char* msg = NULL;
+ SEpSet* pVnodeEpSet = NULL;
+ int32_t msgLen = 0;
+ int32_t reqType = TDMT_MND_TABLE_META;
+ char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName);
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
@@ -1007,26 +1016,26 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
-#else
+#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
-
+
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
rpcFreeCont(rpcRsp.pCont);
@@ -1034,27 +1043,30 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) {
+int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
+ SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
- return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char *)pTableName->tname, out, pTask);
+ return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, pTask);
}
-int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) {
+int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
+ STableMetaOutput* out, SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META;
- char tbFName[TSDB_TABLE_FNAME_LEN];
+ char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
- ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s",
- vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
+ ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
+ vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
- SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
- char *msg = NULL;
+ SBuildTableInput bInput = {
+ .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
@@ -1070,16 +1082,16 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
- SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
- .requestId = pConn->requestId,
- .requestObjRefId = pConn->requestObjRefId,
- .mgmtEps = vgroupInfo->epSet};
+ SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
+ .requestId = pConn->requestId,
+ .requestObjRefId = pConn->requestObjRefId,
+ .mgmtEps = vgroupInfo->epSet};
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
-#else
+#else
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
- char dbFName[TSDB_DB_FNAME_LEN];
+ char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(ctx->pName, dbFName);
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
@@ -1087,40 +1099,41 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
}
taosArrayPush(pTaskId, &pTask->taskId);
- CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen));
-#endif
+ CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen));
+#endif
}
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
- CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
+ CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
+ SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_VND_TABLE_CFG;
- char tbFName[TSDB_TABLE_FNAME_LEN];
+ char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
- ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s",
- vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
+ ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
+ vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
if (code) {
@@ -1131,29 +1144,29 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
- SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
- .requestId = pConn->requestId,
- .requestObjRefId = pConn->requestObjRefId,
- .mgmtEps = vgroupInfo->epSet};
+ SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
+ .requestId = pConn->requestId,
+ .requestObjRefId = pConn->requestObjRefId,
+ .mgmtEps = vgroupInfo->epSet};
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
#else
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
- char dbFName[TSDB_DB_FNAME_LEN];
+ char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(ctx->pName, dbFName);
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen));
-#endif
+#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -1163,18 +1176,18 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont);
-
+
return TSDB_CODE_SUCCESS;
}
-
-int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
+ SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_TABLE_CFG;
- char tbFName[TSDB_TABLE_FNAME_LEN];
+ char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
@@ -1191,20 +1204,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
-#else
+#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -1214,15 +1227,15 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont);
-
+
return TSDB_CODE_SUCCESS;
}
-int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) {
- char *msg = NULL;
+int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
+ char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_SERVER_VERSION;
- void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
+ void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
qDebug("try to get svr ver from mnode");
@@ -1237,20 +1250,20 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
-#else
+#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
-
+
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
}
-
+
SRpcMsg rpcMsg = {
.msgType = reqType,
- .pCont = msg,
+ .pCont = msg,
.contLen = msgLen,
};
@@ -1260,8 +1273,6 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
rpcFreeCont(rpcRsp.pCont);
-
+
return TSDB_CODE_SUCCESS;
}
-
-
diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c
index 1c08fafaa3..b30fce4988 100644
--- a/source/libs/executor/src/dataInserter.c
+++ b/source/libs/executor/src/dataInserter.c
@@ -225,7 +225,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
}
blkHead->dataLen = htonl(dataLen);
- blkHead->numOfRows = htons(rows);
+ blkHead->numOfRows = htonl(rows);
ret->length += sizeof(SSubmitBlk) + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 52188941b4..01670efbb1 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -301,7 +301,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
// too many time window in query
- if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
+ if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
+ taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
@@ -3397,7 +3398,12 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
uint32_t defaultBufsz = 0;
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
- int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ qError("Init stream agg supporter failed since %s", terrstr(terrno));
+ return terrno;
+ }
+ int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -3598,7 +3604,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
}
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
- STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
+ STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType,
+ int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
@@ -3635,7 +3642,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
- int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
+ int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
int32_t type = convertFillType(pPhyFillNode->mode);
SResultInfo* pResultInfo = &pOperator->resultInfo;
@@ -3833,7 +3840,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum)
return TDB_CODE_SUCCESS;
}
-bool groupbyTbname(SNodeList* pGroupList) {
+bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false;
if (LIST_LENGTH(pGroupList) > 0) {
SNode* p = nodesListGetNode(pGroupList, 0);
@@ -3875,7 +3882,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool assignUid = groupbyTbname(group);
int32_t groupNum = 0;
- size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
+ size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
@@ -4608,7 +4615,7 @@ void releaseQueryBuf(size_t numOfTables) {
}
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
- SExplainExecInfo execInfo = {0};
+ SExplainExecInfo execInfo = {0};
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
@@ -4618,7 +4625,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
pExplainInfo->verboseInfo = NULL;
if (operatorInfo->fpSet.getExplainFn) {
- int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
+ int32_t code =
+ operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
if (code) {
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
return code;
@@ -4629,7 +4637,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
if (code != TSDB_CODE_SUCCESS) {
-// taosMemoryFreeClear(*pRes);
+ // taosMemoryFreeClear(*pRes);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
}
@@ -4659,7 +4667,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
if (bufSize <= pageSize) {
bufSize = pageSize * 4;
}
- int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ qError("Init stream agg supporter failed since %s", terrstr(terrno));
+ return terrno;
+ }
+ int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf;
}
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index c1e5252089..938134c167 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -764,7 +764,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
uint32_t defaultBufsz = 0;
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
- int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ pTaskInfo->code = terrno;
+ qError("Create partition operator info failed since %s", terrstr(terrno));
+ goto _error;
+ }
+ int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c
index 53e25c7e97..9a7c3cf7fb 100644
--- a/source/libs/executor/src/projectoperator.c
+++ b/source/libs/executor/src/projectoperator.c
@@ -56,6 +56,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
goto _error;
}
+ pOperator->pTaskInfo = pTaskInfo;
+
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
@@ -63,7 +65,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = pResBlock;
- pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
+ pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->pFilterNode = pProjPhyNode->node.pConditions;
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
@@ -73,7 +75,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->mergeDataBlocks = false;
}
-
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
@@ -89,12 +90,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
- pOperator->name = "ProjectOperator";
+ pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
- pOperator->info = pInfo;
- pOperator->pTaskInfo = pTaskInfo;
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
+ pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
destroyProjectOperatorInfo, NULL, NULL, NULL);
@@ -106,7 +106,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
return pOperator;
- _error:
+_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@@ -156,7 +156,8 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
return PROJECT_RETRIEVE_DONE;
}
-static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock, SOperatorInfo* pOperator) {
+static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
+ SOperatorInfo* pOperator) {
// set current group id
pLimitInfo->currentGroupId = groupId;
@@ -170,8 +171,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
}
// check for the limitation in each group
- if (pLimitInfo->limit.limit >= 0 &&
- pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
+ if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
@@ -222,7 +222,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
- SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo;
+ SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo;
if (downstream == NULL) {
return doGenerateSourceData(pOperator);
@@ -317,7 +317,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
-
+
// printDataBlock1(p, "project");
return (p->info.rows > 0) ? p : NULL;
}
@@ -330,6 +330,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
goto _error;
}
+ pOperator->pTaskInfo = pTaskInfo;
+
SExprSupp* pSup = &pOperator->exprSupp;
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
@@ -373,7 +375,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
- pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
@@ -385,7 +386,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
return pOperator;
- _error:
+_error:
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
@@ -593,7 +594,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pRes->info.rows = 1;
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
- /*int32_t status = */doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
+ /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
pOperator->resultInfo.totalRows += pRes->info.rows;
diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c
index e371e6d9cf..16f35b1b0d 100644
--- a/source/libs/executor/src/sortoperator.c
+++ b/source/libs/executor/src/sortoperator.c
@@ -30,6 +30,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
goto _error;
}
+ pOperator->pTaskInfo = pTaskInfo;
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
@@ -45,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->binfo.pRes = pResBlock;
- pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
+ pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pCondition = pSortNode->node.pConditions;
pInfo->pColMatchInfo = pColMatchColInfo;
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
@@ -57,7 +58,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
- pOperator->pTaskInfo = pTaskInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
@@ -222,7 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
}
// todo add the limit/offset info
- if (pInfo->limitInfo.remainOffset > 0) {
+ if (pInfo->limitInfo.remainOffset > 0) {
if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) {
pInfo->limitInfo.remainOffset -= pBlock->info.rows;
continue;
@@ -247,7 +247,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
}
}
- return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL;
+ return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
}
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
@@ -474,7 +474,7 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
-
+
taosMemoryFreeClear(param);
}
@@ -609,8 +609,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
- }
- else {
+ } else {
pTupleHandle = tsortNextTuple(pHandle);
pInfo->groupId = 0;
}
@@ -694,7 +693,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
-
+
taosMemoryFreeClear(param);
}
@@ -711,7 +710,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai
}
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams,
- SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) {
+ SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) {
SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode;
SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index a14f554cf5..c8951c30b6 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -928,8 +928,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
- STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
- int32_t ret = TSDB_CODE_SUCCESS;
+ STimeWindow win =
+ getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
+ int32_t ret = TSDB_CODE_SUCCESS;
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
@@ -1091,8 +1092,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true);
- blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
-
+ blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
+
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
}
@@ -1790,9 +1791,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error;
}
+ pOperator->pTaskInfo = pTaskInfo;
pInfo->win = pTaskInfo->window;
- pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
- pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
+ pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
+ pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp;
@@ -1845,7 +1847,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
- pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
@@ -1880,6 +1881,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
goto _error;
}
+ pOperator->pTaskInfo = pTaskInfo;
pInfo->inputOrder = TSDB_ORDER_ASC;
pInfo->interval = *pInterval;
pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
@@ -1906,7 +1908,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
- pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
@@ -2180,7 +2181,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
}
-
}
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
@@ -2457,7 +2457,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
- int32_t numOfCols = 0;
+ int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
@@ -2475,11 +2475,11 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
- pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
- pInfo->binfo.pRes = pResBlock;
+ pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
+ pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
- pInfo->reptScan = false;
- pInfo->pCondition = pSessionNode->window.node.pConditions;
+ pInfo->reptScan = false;
+ pInfo->pCondition = pSessionNode->window.node.pConditions;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
@@ -3028,6 +3028,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error;
}
+ pOperator->pTaskInfo = pTaskInfo;
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
@@ -3114,7 +3115,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
- pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
@@ -3155,7 +3155,7 @@ void destroyStateWinInfo(void* ptr) {
if (ptr == NULL) {
return;
}
- SStateWindowInfo* pWin = (SStateWindowInfo*) ptr;
+ SStateWindowInfo* pWin = (SStateWindowInfo*)ptr;
taosMemoryFreeClear(pWin->stateKey.pData);
}
@@ -3246,6 +3246,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error;
}
+ pOperator->pTaskInfo = pTaskInfo;
+
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
@@ -3308,7 +3310,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
- pOperator->pTaskInfo = pTaskInfo;
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType);
code = appendDownstream(pOperator, &downstream, 1);
@@ -3465,7 +3466,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
// too many time window in query
int32_t size = taosArrayGetSize(pAggSup->pCurWins);
- if (size > MAX_INTERVAL_TIME_WINDOW) {
+ if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
@@ -3647,8 +3648,8 @@ void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
taosArrayRemove(pWinInfos, index);
}
-static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap,
- SArray* result, FDelete fp) {
+static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result,
+ FDelete fp) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
@@ -4673,7 +4674,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currTs = tsCols[currPos];
currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
- iaInfo->interval.precision) - 1;
+ iaInfo->interval.precision) -
+ 1;
startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
@@ -4933,8 +4935,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
- STimeWindow win =
- getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->inputOrder);
+ STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
+ iaInfo->inputOrder);
int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
@@ -4975,7 +4977,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
- startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
+ startPos =
+ getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
if (startPos < 0) {
break;
}
diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c
index 00a9f3ae6c..ad97d79f7e 100644
--- a/source/libs/executor/src/tlinearhash.c
+++ b/source/libs/executor/src/tlinearhash.c
@@ -247,7 +247,12 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
return NULL;
}
- int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, TD_TMP_DIR_PATH);
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ printf("tHash Init failed since %s", terrstr(terrno));
+ return NULL;
+ }
+ int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, tsTempDir);
if (code != 0) {
terrno = code;
return NULL;
diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c
index 4f525441b9..48af951773 100644
--- a/source/libs/executor/src/tsort.c
+++ b/source/libs/executor/src/tsort.c
@@ -159,7 +159,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t start = 0;
if (pHandle->pBuf == NULL) {
- int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", TD_TMP_DIR_PATH);
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ qError("Add to buf failed since %s", terrstr(terrno));
+ return terrno;
+ }
+ int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
@@ -233,7 +238,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} else {
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
- code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", TD_TMP_DIR_PATH);
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ code = terrno;
+ qError("Sort compare init failed since %s", terrstr(terrno));
+ return code;
+ }
+ code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index dd84535eac..ae07651402 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -19,6 +19,7 @@
#include "querynodes.h"
#include "scalar.h"
#include "taoserror.h"
+#include "ttime.h"
static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) {
va_list vArgList;
@@ -1442,6 +1443,58 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS;
}
+static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
+ int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
+ uint8_t dbPrec = pFunc->node.resType.precision;
+
+ if (1 != numOfParams && 3 != numOfParams && 4 != numOfParams) {
+ return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
+ }
+
+ if (3 <= numOfParams) {
+ int64_t timeVal[2] = {0};
+ for (int32_t i = 1; i < 3; ++i) {
+ uint8_t nodeType = nodeType(nodesListGetNode(pFunc->pParameterList, i));
+ uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
+ if (!IS_VAR_DATA_TYPE(paraType) || QUERY_NODE_VALUE != nodeType) {
+ return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
+ }
+
+ SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i);
+ int32_t ret = convertStringToTimestamp(paraType, pValue->datum.p, dbPrec, &timeVal[i - 1]);
+ if (ret != TSDB_CODE_SUCCESS) {
+ return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
+ }
+ }
+
+ if (timeVal[0] > timeVal[1]) {
+ return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
+ "INTERP function invalid time range");
+ }
+ }
+
+ if (4 == numOfParams) {
+ uint8_t nodeType = nodeType(nodesListGetNode(pFunc->pParameterList, 3));
+ uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type;
+ if (!IS_INTEGER_TYPE(paraType) || QUERY_NODE_VALUE != nodeType) {
+ return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
+ }
+
+ int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3));
+ if (ret == TIME_UNIT_TOO_SMALL) {
+ return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
+ "INTERP function time interval parameter should be greater than db precision");
+ } else if (ret == TIME_UNIT_INVALID) {
+ return buildFuncErrMsg(
+ pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
+ "INTERP function time interval parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
+ }
+ }
+
+ pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
+ return TSDB_CODE_SUCCESS;
+}
+
static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// forbid null as first/last input, since first(c0, null, 1) may have different number of input
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
@@ -2237,7 +2290,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "interp",
.type = FUNCTION_TYPE_INTERP,
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
- .translateFunc = translateFirstLast,
+ .translateFunc = translateInterp,
.getEnvFunc = getSelectivityFuncEnv,
.initFunc = functionSetup,
.processFunc = NULL,
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index 38c35e382a..e78a78e7d8 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -28,7 +28,6 @@
#define HISTOGRAM_MAX_BINS_NUM 1000
#define MAVG_MAX_POINTS_NUM 1000
-#define SAMPLE_MAX_POINTS_NUM 1000
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
@@ -4898,9 +4897,7 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
pInfo->numSampled = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
- if (pInfo->samples < 1 || pInfo->samples > SAMPLE_MAX_POINTS_NUM) {
- return false;
- }
+
pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c
index 057d2bc7dc..517253dc01 100644
--- a/source/libs/function/src/tpercentile.c
+++ b/source/libs/function/src/tpercentile.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "taoserror.h"
#include "tglobal.h"
#include "tcompare.h"
@@ -257,7 +258,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket);
- int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", TD_TMP_DIR_PATH);
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ // qError("MemBucket create disk based Buf failed since %s", terrstr(terrno));
+ tMemBucketDestroy(pBucket);
+ return NULL;
+ }
+
+ int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir);
if (ret != 0) {
tMemBucketDestroy(pBucket);
return NULL;
diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c
index 24749729c0..1cbc78df48 100644
--- a/source/libs/function/src/udfd.c
+++ b/source/libs/function/src/udfd.c
@@ -412,22 +412,31 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
+ if (!osTempSpaceAvailable()) {
+ terrno = TSDB_CODE_NO_AVAIL_DISK;
+ msgInfo->code = terrno;
+ fnError("udfd create shared library failed since %s", terrstr(terrno));
+ goto _return;
+ }
+
char path[PATH_MAX] = {0};
#ifdef WINDOWS
- snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name);
+ snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name);
#else
- snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name);
+ snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name);
#endif
TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
+ goto _return;
}
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
+ goto _return;
}
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
@@ -686,7 +695,6 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->len = 0;
}
}
- fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
}
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h
index ea78735d5e..ddfcd3a249 100644
--- a/source/libs/parser/inc/parInsertData.h
+++ b/source/libs/parser/inc/parInsertData.h
@@ -122,7 +122,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *
pBlocks->sversion = dataBuf->pTableMeta->sversion;
pBlocks->schemaLen = dataBuf->createTbReqLen;
- if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
+ if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
pBlocks->numOfRows += numOfRows;
diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c
index 73a4a3472a..85f73f0663 100644
--- a/source/libs/parser/src/parInsert.c
+++ b/source/libs/parser/src/parInsert.c
@@ -292,11 +292,10 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
int32_t schemaLen = blk->schemaLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
- blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
- blk->numOfRows = htons(blk->numOfRows);
+ blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
}
}
@@ -1267,7 +1266,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
- return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
+ return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
dataBuf->numOfTables = 1;
@@ -1339,7 +1338,7 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
- return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
+ return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
dataBuf->numOfTables = 1;
@@ -1986,7 +1985,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
- return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
+ return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
@@ -2074,7 +2073,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
- return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
+ return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
}
@@ -2444,7 +2443,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
- return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
+ return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index f53f1f27b7..d96cdca8a9 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -5845,11 +5845,16 @@ static int32_t createTagValFromExpr(STranslateContext* pCxt, SDataType targetDt,
}
static int32_t createTagValFromVal(STranslateContext* pCxt, SDataType targetDt, SNode* pNode, SValueNode** pVal) {
- *pVal = (SValueNode*)nodesCloneNode(pNode);
- if (NULL == *pVal) {
+ SValueNode* pTempVal = (SValueNode*)nodesCloneNode(pNode);
+ if (NULL == pTempVal) {
return TSDB_CODE_OUT_OF_MEMORY;
}
- return DEAL_RES_ERROR == translateValueImpl(pCxt, *pVal, targetDt, true) ? pCxt->errCode : TSDB_CODE_SUCCESS;
+ if (DEAL_RES_ERROR == translateValueImpl(pCxt, pTempVal, targetDt, true)) {
+ nodesDestroyNode((SNode*)pTempVal);
+ return pCxt->errCode;
+ }
+ *pVal = pTempVal;
+ return TSDB_CODE_SUCCESS;
}
static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode,
diff --git a/source/libs/parser/test/parInsertTest.cpp b/source/libs/parser/test/parInsertTest.cpp
index 22a1be2579..f3f87e945a 100644
--- a/source/libs/parser/test/parInsertTest.cpp
+++ b/source/libs/parser/test/parInsertTest.cpp
@@ -110,15 +110,15 @@ class InsertTest : public Test {
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
cout << "Block:" << i << endl;
- cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", padding:" << ntohl(blk->padding)
- << ", sversion:" << ntohl(blk->sversion) << ", dataLen:" << ntohl(blk->dataLen)
- << ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl;
+ cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
+ << ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
+ << ", numOfRows:" << ntohl(blk->numOfRows) << endl;
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
}
- void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) {
+ void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
@@ -134,7 +134,7 @@ class InsertTest : public Test {
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
- ASSERT_EQ(ntohs(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
+ ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c
index 0f63510b12..a7bc4df281 100644
--- a/source/libs/sync/src/syncAppendEntries.c
+++ b/source/libs/sync/src/syncAppendEntries.c
@@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
- walFsync(pWal, true);
+ walFsync(pWal, false);
// update match index
matchIndex = pMsg->prevLogIndex + pMsg->dataCount;
@@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
- walFsync(pWal, true);
+ walFsync(pWal, false);
}
// prepare response msg
diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c
index 0e17c1db80..bf440f04a0 100644
--- a/source/libs/sync/src/syncRaftLog.c
+++ b/source/libs/sync/src/syncRaftLog.c
@@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SWal* pWal = pData->pWal;
SyncIndex index = 0;
- SWalSyncInfo syncMeta;
+ SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
@@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SWal* pWal = pData->pWal;
SyncIndex index = 0;
- SWalSyncInfo syncMeta;
+ SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c
index f02c013d31..12d6797349 100644
--- a/source/libs/sync/src/syncReplication.c
+++ b/source/libs/sync/src/syncReplication.c
@@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
- // SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
- SyncIndex newNextIndex = nextIndex + 1;
+ SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
+ // SyncIndex newNextIndex = nextIndex + 1;
+
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
@@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
- // SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
- SyncIndex newNextIndex = nextIndex + 1;
+ SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
+ // SyncIndex newNextIndex = nextIndex + 1;
+
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c
index 6ae3d8a0c0..8f6800c7be 100644
--- a/source/os/src/osEnv.c
+++ b/source/os/src/osEnv.c
@@ -105,6 +105,10 @@ void osCleanup() {}
bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; }
+bool osDataSpaceAvailable() { return tsDataSpace.reserved <= tsDataSpace.size.avail; }
+
+bool osTempSpaceAvailable() { return tsTempSpace.reserved <= tsTempSpace.size.avail; }
+
void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); }
void osSetSystemLocale(const char *inLocale, const char *inCharSet) {
diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c
index 73d89523ce..cd93509a1f 100644
--- a/source/util/src/tlog.c
+++ b/source/util/src/tlog.c
@@ -691,9 +691,16 @@ static void taosWriteLog(SLogBuff *pLogBuf) {
static void *taosAsyncOutputLog(void *param) {
SLogBuff *pLogBuf = (SLogBuff *)param;
setThreadName("log");
-
+ int32_t count = 0;
while (1) {
+ count += tsWriteInterval;
taosMsleep(tsWriteInterval);
+ if (count > 1000) {
+ osUpdate();
+ count = 0;
+ uError("Write log file failed, since log disk spase is not enough.\n %f GB, too little, require %f GB at least at least.", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
+ if (!osLogSpaceAvailable()) pLogBuf->stop = 1;
+ }
// Polling the buffer
taosWriteLog(pLogBuf);
diff --git a/tests/script/tsim/sync/start3replica.sim b/tests/script/tsim/sync/start3replica.sim
new file mode 100644
index 0000000000..f66021f88a
--- /dev/null
+++ b/tests/script/tsim/sync/start3replica.sim
@@ -0,0 +1,19 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+system sh/deploy.sh -n dnode2 -i 2
+system sh/deploy.sh -n dnode3 -i 3
+system sh/deploy.sh -n dnode4 -i 4
+
+system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
+
+system sh/exec.sh -n dnode1 -s start
+system sh/exec.sh -n dnode2 -s start
+system sh/exec.sh -n dnode3 -s start
+system sh/exec.sh -n dnode4 -s start
+
+sql connect
+sql create dnode $hostname port 7200
+sql create dnode $hostname port 7300
+sql create dnode $hostname port 7400
+
+