Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-27 10:01:12 +08:00
commit e9bec34c44
56 changed files with 1040 additions and 435 deletions

View File

@ -1,13 +0,0 @@
# bdb
ExternalProject_Add(bdb
GIT_REPOSITORY https://github.com/berkeleydb/libdb.git
GIT_TAG v5.3.28
SOURCE_DIR "${TD_CONTRIB_DIR}/bdb"
BINARY_DIR "${TD_CONTRIB_DIR}/bdb"
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND COMMAND ./dist/configure --enable-debug
BUILD_COMMAND "$(MAKE)"
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -1,13 +0,0 @@
# leveldb
ExternalProject_Add(leveldb
GIT_REPOSITORY https://github.com/taosdata-contrib/leveldb.git
GIT_TAG master
SOURCE_DIR "${TD_CONTRIB_DIR}/leveldb"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -1,12 +0,0 @@
# lucene
ExternalProject_Add(lucene
GIT_REPOSITORY https://github.com/yihaoDeng/LucenePlusPlus.git
SOURCE_DIR "${TD_CONTRIB_DIR}/lucene"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -1,12 +0,0 @@
# NuRaft
ExternalProject_Add(NuRaft
GIT_REPOSITORY https://github.com/eBay/NuRaft.git
GIT_TAG v1.3.0
SOURCE_DIR "${TD_CONTRIB_DIR}/nuraft"
BINARY_DIR "${TD_CONTRIB_DIR}/nuraft"
CONFIGURE_COMMAND "./prepare.sh"
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -109,11 +109,6 @@ cat("${TD_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cJson
cat("${TD_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# leveldb
if(${BUILD_WITH_LEVELDB})
cat("${TD_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_LEVELDB})
if (${BUILD_CONTRIB})
if(${BUILD_WITH_ROCKSDB})
cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -132,28 +127,11 @@ else()
endif()
endif()
# canonical-raft
if(${BUILD_WITH_CRAFT})
cat("${TD_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE)
endif(${BUILD_WITH_CRAFT})
# traft
if(${BUILD_WITH_TRAFT})
cat("${TD_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE)
endif(${BUILD_WITH_TRAFT})
#libuv
if(${BUILD_WITH_UV})
cat("${TD_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_UV})
# bdb
if(${BUILD_WITH_BDB})
cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_BDB})
# sqlite
if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -178,17 +156,6 @@ elseif(${BUILD_WITH_COS})
endif()
# lucene
if(${BUILD_WITH_LUCENE})
cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_LUCENE)
endif(${BUILD_WITH_LUCENE})
# NuRaft
if(${BUILD_WITH_NURAFT})
cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT})
# crashdump
if(${BUILD_CRASHDUMP})
cat("${TD_SUPPORT_DIR}/crashdump_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -448,23 +415,6 @@ elseif(${BUILD_WITH_COS})
endif()
# lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev
if(${BUILD_WITH_LUCENE})
option(ENABLE_TEST "Enable the tests" OFF)
add_subdirectory(lucene EXCLUDE_FROM_ALL)
target_include_directories(
lucene++
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/lucene/include>
)
endif(${BUILD_WITH_LUCENE})
# NuRaft
if(${BUILD_WITH_NURAFT})
add_subdirectory(nuraft EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_NURAFT})
# pthread
if(${BUILD_PTHREAD})
if ("${CMAKE_BUILD_TYPE}" STREQUAL "")
@ -537,30 +487,6 @@ if(${BUILD_WCWIDTH})
SET_TARGET_PROPERTIES(wcwidth PROPERTIES OUTPUT_NAME wcwidth)
endif(${BUILD_WCWIDTH})
# CRAFT
if(${BUILD_WITH_CRAFT})
add_library(craft STATIC IMPORTED GLOBAL)
set_target_properties(craft PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft/include"
)
# target_link_libraries(craft
# INTERFACE pthread
# )
endif(${BUILD_WITH_CRAFT})
# TRAFT
if(${BUILD_WITH_TRAFT})
add_library(traft STATIC IMPORTED GLOBAL)
set_target_properties(traft PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include"
)
# target_link_libraries(craft
# INTERFACE pthread
# )
endif(${BUILD_WITH_TRAFT})
# LIBUV
if(${BUILD_WITH_UV})
if (TD_WINDOWS)
@ -572,18 +498,6 @@ if(${BUILD_WITH_UV})
add_subdirectory(libuv EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_UV})
# BDB
if(${BUILD_WITH_BDB})
add_library(bdb STATIC IMPORTED GLOBAL)
set_target_properties(bdb PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb"
)
target_link_libraries(bdb
INTERFACE pthread
)
endif(${BUILD_WITH_BDB})
# SQLite
# see https://stackoverflow.com/questions/8774593/cmake-link-to-external-library#comment58570736_10550334
if(${BUILD_WITH_SQLITE})

View File

@ -3271,7 +3271,7 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq;
typedef struct {
int8_t reserved;

View File

@ -272,6 +272,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)

View File

@ -666,17 +666,19 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct STaskStatusEntry {
STaskId id;
int32_t status;
int32_t statusLastDuration; // to record the last duration of current status
int64_t stage;
int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not
double inputQUsed; // in MiB
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not
double inputQUsed; // in MiB
double inputRate;
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dest data size
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dst data size
} STaskStatusEntry;
typedef struct SStreamHbMsg {

View File

@ -124,6 +124,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133)
#define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134)
#define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)

View File

@ -555,6 +555,9 @@ static void *tscCrashReportThreadFp(void *param) {
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
pFile = NULL;
taosMsleep(sleepTime);
loopTimes = 0;
continue;
}
} else {

View File

@ -1216,7 +1216,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
while (syncAskEp(tmq) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
@ -1454,6 +1454,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
return false;
}
@ -1965,9 +1968,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL;
}
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
while (1) {
if(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__RECOVER){
break;
}
tscInfo("consumer:0x%" PRIx64 " tmq status is recover", tmq->consumerId);
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
while (syncAskEp(tmq) != 0) {
if (retryCnt++ > 40) {
return NULL;
}

View File

@ -892,6 +892,8 @@ long s3Size(const char *object_name) {
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
return -1;
}
size = cbd.content_length;

View File

@ -127,6 +127,9 @@ static void *dmCrashReportThreadFp(void *param) {
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
pFile = NULL;
taosMsleep(sleepTime);
loopTimes = 0;
continue;
}
} else {

View File

@ -84,6 +84,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -281,8 +281,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode != NULL) {
SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
dError("vgId:%d, already exist", req.vgId);
tFreeSCreateVnodeReq(&req);
vmReleaseVnode(pMgmt, pVnode);
@ -833,6 +833,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -1014,6 +1014,20 @@ static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *col
return -1;
}
static bool mndValidateSchema(SSchema *pSchemas, int32_t nSchema, SArray *pFields, int32_t maxLen) {
int32_t rowLen = 0;
for (int32_t i = 0; i < nSchema; ++i) {
rowLen += (pSchemas + i)->bytes;
}
int32_t nField = taosArrayGetSize(pFields);
for (int32_t i = 0; i < nField; ++i) {
rowLen += ((SField *)TARRAY_GET_ELEM(pFields, i))->bytes;
}
return rowLen <= maxLen;
}
static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq *createReq) {
taosRLockLatch(&pStb->lock);
memcpy(pDst, pStb, sizeof(SStbObj));
@ -1269,6 +1283,11 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
return -1;
}
if (!mndValidateSchema(pOld->pTags, pOld->numOfTags, pFields, TSDB_MAX_TAGS_LEN)) {
terrno = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
return -1;
}
pNew->numOfTags = pNew->numOfTags + ntags;
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
@ -1558,6 +1577,16 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj
return -1;
}
uint32_t nLen = 0;
for (int32_t i = 0; i < pOld->numOfTags; ++i) {
nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
}
if (nLen > TSDB_MAX_TAGS_LEN) {
terrno = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
@ -1592,6 +1621,11 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return -1;
}
if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
return -1;
}
pNew->numOfColumns = pNew->numOfColumns + ncols;
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;

View File

@ -75,6 +75,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
@ -1604,6 +1606,123 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) {
SColumnInfoData *pColInfo;
int32_t cols = 0;
// stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char idstr[128] = {0};
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
idstr[2] = '0';
idstr[3] = 'x';
varDataSetLen(idstr, len + 2);
colDataSetVal(pColInfo, numOfRows, idstr, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
varDataSetLen(nodeType, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->info.nodeId > 0) {
memcpy(varDataVal(nodeType), "vnode", 5);
} else {
memcpy(varDataVal(nodeType), "snode", 5);
}
colDataSetVal(pColInfo, numOfRows, nodeType, false);
// node id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int64_t nodeId = TMAX(pTask->info.nodeId, 0);
colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
// level
char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6);
varDataSetLen(level, 6);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
return;
}
const char *pStatus = streamTaskGetStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
// status
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
// stage
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
// input queue
char vbuf[30] = {0};
char buf[25] = {0};
const char *queueInfoStr = "%4.2fMiB (%5.2f%)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char *sinkStr = "%.2fMiB";
sprintf(buf, sinkStr, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
}
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
}
static int32_t getNumOfTasks(SArray *pTaskList) {
int32_t numOfLevels = taosArrayGetSize(pTaskList);
int32_t count = 0;
for (int32_t i = 0; i < numOfLevels; i++) {
SArray *pLevel = taosArrayGetP(pTaskList, i);
count += taosArrayGetSize(pLevel);
}
return count;
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
@ -1619,137 +1738,25 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// lock
taosRLockLatch(&pStream->lock);
// count task num
int32_t sz = taosArrayGetSize(pStream->tasks);
int32_t count = 0;
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
count += taosArrayGetSize(pLevel);
}
int32_t count = getNumOfTasks(pStream->tasks);
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < sz; i++) {
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t levelCnt = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < levelCnt; j++) {
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
SColumnInfoData *pColInfo;
int32_t cols = 0;
// stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char idstr[128] = {0};
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
idstr[2] = '0';
idstr[3] = 'x';
varDataSetLen(idstr, len + 2);
colDataSetVal(pColInfo, numOfRows, idstr, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
varDataSetLen(nodeType, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->info.nodeId > 0) {
memcpy(varDataVal(nodeType), "vnode", 5);
} else {
memcpy(varDataVal(nodeType), "snode", 5);
}
colDataSetVal(pColInfo, numOfRows, nodeType, false);
// node id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int64_t nodeId = TMAX(pTask->info.nodeId, 0);
colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
// level
char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6);
varDataSetLen(level, 6);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
}
const char *pStatus = streamTaskGetStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
// status
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
// stage
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
// input queue
char vbuf[30] = {0};
char buf[25] = {0};
const char *queueInfoStr = "%4.2fMiB (%5.2f%)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char *sinkStr = "%.2fMiB";
sprintf(buf, sinkStr, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
}
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
numOfRows++;
}
}
// unlock
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
@ -2762,7 +2769,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t le
return TSDB_CODE_SUCCESS;
}
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
@ -2797,6 +2804,91 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
return 0;
}
static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (pTask->id.taskId == pId->taskId) {
return pTask;
}
}
}
return NULL;
}
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) {
int32_t numOfReady = 0;
int32_t numOfTotal = 0;
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pTaskEntry->id.streamId == pId->streamId) {
numOfTotal++;
if (pTaskEntry->id.taskId != pId->taskId) {
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->status == TASK_STATUS__READY) {
numOfReady++;
}
}
}
}
if (numOfReady > 0) {
mDebug("stream:0x%" PRIx64
" %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task",
pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
return true;
} else {
return false;
}
}
}
return false;
}
// currently only handle the sink task
// 1. sink task, drop related fill-history task msg is missing
// 2. other tasks are in ready state for at least 3 * hb_interval
static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *pTaskEntry, SStreamObj *pStream) {
SStreamTask *pTask = mndGetStreamTask(&pTaskEntry->id, pStream);
if (pTask == NULL) {
mError("failed to get the stream task:0x%x, may have been dropped", (int32_t)pTaskEntry->id.taskId);
return -1;
}
SVDropHTaskReq *pReq = rpcMallocCont(sizeof(SVDropHTaskReq));
if (pReq == NULL) {
mError("failed to malloc in drop related fill-history task, size:%" PRIzu ", code:%s", sizeof(SVDropHTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SRpcMsg msg = {.info.noResp = 1};
initRpcMsg(&msg, TDMT_STREAM_HTASK_DROP, pReq, sizeof(SVDropHTaskReq));
mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
tmsgSendReq(&epset, &msg);
return TSDB_CODE_SUCCESS;
}
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
@ -2897,9 +2989,28 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
pTaskEntry->status = p->status;
if (p->status == pTaskEntry->status) {
pTaskEntry->statusLastDuration++;
} else {
pTaskEntry->status = p->status;
pTaskEntry->statusLastDuration = 0;
}
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) {
bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo);
if (drop) {
SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId);
if (pStreamObj == NULL) {
mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid);
} else {
mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj);
mndReleaseStream(pMnode, pStreamObj);
}
}
}
}
}
@ -2931,3 +3042,17 @@ void freeCheckpointCandEntry(void *param) {
SCheckpointCandEntry *pEntry = param;
taosMemoryFreeClear(pEntry->pName);
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void * pIter = NULL;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->uid == streamId) {
sdbCancelFetch(pSdb, pIter);
return pStream;
}
}
return NULL;
}

View File

@ -79,6 +79,7 @@ struct SMeta {
char* path;
SVnode* pVnode;
bool changed;
TDB* pEnv;
TXN* txn;
TTB* pTbDb;

View File

@ -232,6 +232,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg);
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
int32_t tqRestartStreamTasks(STQ* pTq);

View File

@ -56,7 +56,7 @@ int metaPrepareAsyncCommit(SMeta *pMeta) {
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
metaULock(pMeta);
code = tdbCommit(pMeta->pEnv, pMeta->txn);
pMeta->changed = false;
return code;
}

View File

@ -251,6 +251,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
++pMeta->pVnode->config.vndStats.numOfSTables;
pMeta->changed = true;
metaDebug("vgId:%d, stb:%s is created, suid:%" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
return 0;
@ -325,6 +326,8 @@ _drop_super_table:
metaUpdTimeSeriesNum(pMeta);
pMeta->changed = true;
_exit:
tdbFree(pKey);
tdbFree(pData);
@ -424,6 +427,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
metaTimeSeriesNotifyCheck(pMeta);
}
pMeta->changed = true;
_exit:
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc);
@ -847,6 +852,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
}
}
pMeta->changed = true;
metaDebug("vgId:%d, table:%s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
pReq->type);
return 0;
@ -895,6 +901,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
*tbUid = uid;
}
pMeta->changed = true;
_exit:
tdbFree(pData);
return rc;
@ -938,6 +945,8 @@ void metaDropTables(SMeta *pMeta, SArray *tbUids) {
}
}
tSimpleHashCleanup(suidHash);
pMeta->changed = true;
}
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
@ -1233,6 +1242,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if (pAlterTbReq->colName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
metaError("meta/table: null pAlterTbReq->colName");
return -1;
}
@ -1300,20 +1310,27 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid};
oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols;
int32_t iCol = 0;
int32_t rowLen = -1;
if (pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ||
pAlterTbReq->action == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES) {
rowLen = 0;
}
int32_t iCol = 0, jCol = 0;
SSchema *qColumn = NULL;
for (;;) {
pColumn = NULL;
qColumn = NULL;
if (iCol >= pSchema->nCols) break;
pColumn = &pSchema->pSchema[iCol];
if (jCol >= pSchema->nCols) break;
qColumn = &pSchema->pSchema[jCol];
if (NULL == pAlterTbReq->colName) {
metaError("meta/table: null pAlterTbReq->colName");
return -1;
if (!pColumn && (strcmp(qColumn->name, pAlterTbReq->colName) == 0)) {
pColumn = qColumn;
iCol = jCol;
if (rowLen < 0) break;
}
if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break;
iCol++;
rowLen += qColumn->bytes;
++jCol;
}
entry.version = version;
@ -1328,6 +1345,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
goto _err;
}
if (rowLen + pAlterTbReq->bytes > TSDB_MAX_BYTES_PER_ROW) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
goto _err;
}
pSchema->version++;
pSchema->nCols++;
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
@ -1369,10 +1390,14 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes > pAlterTbReq->colModBytes) {
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes >= pAlterTbReq->colModBytes) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
if (rowLen + pAlterTbReq->colModBytes - pColumn->bytes > TSDB_MAX_BYTES_PER_ROW) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
goto _err;
}
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;
@ -1970,6 +1995,7 @@ _err:
}
int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pMetaRsp) {
pMeta->changed = true;
switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
case TSDB_ALTER_TABLE_DROP_COLUMN:

View File

@ -2021,4 +2021,35 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
}
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr);
if (pTask->hTaskInfo.id.taskId == 0) {
tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ASSERT(status == TASK_STATUS__STREAM_SCAN_HISTORY);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -421,7 +421,7 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
if (mtime < committer->ctx->now - tsS3UploadDelaySec) {
committer->ctx->skipTsRow = true;
}
} else if (s3Size(object_name) > 0) {
} else /*if (s3Size(object_name) > 0) */ {
committer->ctx->skipTsRow = true;
}
}

View File

@ -45,7 +45,7 @@ static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
int32_t ftype = TSDB_FTYPE_HEAD;
if (reader->fd[ftype]) {
code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
(uint8_t *)reader->headFooter, sizeof(SHeadFooter));
(uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -67,7 +67,7 @@ static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
int32_t ftype = TSDB_FTYPE_TOMB;
if (reader->fd[ftype]) {
code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
(uint8_t *)reader->tombFooter, sizeof(STombFooter));
(uint8_t *)reader->tombFooter, sizeof(STombFooter), 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
reader->ctx->tombFooterLoaded = true;
@ -161,7 +161,7 @@ int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **b
}
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
reader->headFooter->brinBlkPtr->size);
reader->headFooter->brinBlkPtr->size, 0);
if (code) {
taosMemoryFree(data);
TSDB_CHECK_CODE(code, lino, _exit);
@ -191,7 +191,8 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
code = tRealloc(&reader->config->bufArr[0], brinBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size);
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = 0;
@ -232,7 +233,8 @@ int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *re
code = tRealloc(&reader->config->bufArr[0], record->blockSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize);
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]);
@ -257,8 +259,8 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
code = tRealloc(&reader->config->bufArr[0], record->blockKeySize);
TSDB_CHECK_CODE(code, lino, _exit);
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize,
0);
TSDB_CHECK_CODE(code, lino, _exit);
// hdr
@ -296,10 +298,46 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
reader->config->bufArr[0], hdr->szBlkCol);
reader->config->bufArr[0], hdr->szBlkCol, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t szHint = 0;
if (bData->nColData > 3) {
int64_t offset = 0;
SBlockCol bc = {.cid = 0};
SBlockCol *blockCol = &bc;
size = 0;
SColData *colData = tBlockDataGetColDataByIdx(bData, 0);
while (blockCol && blockCol->cid < colData->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol && blockCol->flag == HAS_VALUE) {
offset = blockCol->offset;
SColData *colDataEnd = tBlockDataGetColDataByIdx(bData, bData->nColData - 1);
while (blockCol && blockCol->cid < colDataEnd->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol && blockCol->flag == HAS_VALUE) {
szHint = blockCol->offset + blockCol->szBitmap + blockCol->szOffset + blockCol->szValue - offset;
}
}
}
SBlockCol bc[1] = {{.cid = 0}};
SBlockCol *blockCol = bc;
@ -338,7 +376,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA],
record->blockOffset + record->blockKeySize + hdr->szBlkCol + blockCol->offset,
reader->config->bufArr[1], size1);
reader->config->bufArr[1], size1, i > 0 ? 0 : szHint);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
@ -366,7 +404,7 @@ int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *rec
code = tRealloc(&reader->config->bufArr[0], record->smaSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decode sma data
@ -405,7 +443,7 @@ int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **t
}
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
reader->tombFooter->tombBlkPtr->size);
reader->tombFooter->tombBlkPtr->size, 0);
if (code) {
taosMemoryFree(data);
TSDB_CHECK_CODE(code, lino, _exit);
@ -435,7 +473,8 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB
code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size);
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = 0;
@ -488,8 +527,8 @@ struct SDataFileWriter {
STombBlock tombBlock[1];
int32_t tombBlockIdx;
// range
SVersionRange range;
SVersionRange tombRange;
SVersionRange range;
SVersionRange tombRange;
} ctx[1];
STFile files[TSDB_FTYPE_MAX];

View File

@ -34,7 +34,7 @@ typedef struct SFDataPtr {
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD);
extern void tsdbCloseFile(STsdbFD **ppFD);
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size);
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size);
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint);
extern int32_t tsdbFsyncFile(STsdbFD *pFD);
#ifdef __cplusplus

View File

@ -901,7 +901,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
if (mtime < now - tsS3UploadDelaySec) {
skipMerge = true;
}
} else if (s3Size(object_name) > 0) {
} else /* if (s3Size(object_name) > 0) */ {
skipMerge = true;
}
}

View File

@ -568,7 +568,7 @@ static int32_t tsdbMerge(void *arg) {
if (mtime < now - tsS3UploadDelaySec) {
skipMerge = true;
}
} else if (s3Size(object_name) > 0) {
} else /* if (s3Size(object_name) > 0) */ {
skipMerge = true;
}
}

View File

@ -26,7 +26,17 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
if (pFD->pFD == NULL) {
int errsv = errno;
const char *object_name = taosDirEntryBaseName((char *)path);
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
long s3_size = 0;
if (tsS3Enabled) {
long size = s3Size(object_name);
if (size < 0) {
code = terrno = TSDB_CODE_FAILED_TO_CONNECT_S3;
goto _exit;
}
s3_size = size;
}
if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) {
#ifndef S3_BLOCK_CACHE
s3EvictCache(path, s3_size);
@ -48,6 +58,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
// pFD->szFile = s3_size;
#endif
} else {
tsdbInfo("no file: %s", path);
code = TAOS_SYSTEM_ERROR(errsv);
// taosMemoryFree(pFD);
goto _exit;
@ -283,7 +294,7 @@ _exit:
return code;
}
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
int32_t code = 0;
int64_t n = 0;
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
@ -330,7 +341,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
n += nRead;
pgno++;
++pgno;
bOffset = 0;
}
@ -339,7 +350,12 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
uint8_t *pBlock = NULL;
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
int64_t pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
if (szHint > 0) {
pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
}
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
@ -350,6 +366,10 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
for (int i = 0; i < nPage; ++i) {
tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
if (szHint > 0 && n >= size) {
++pgno;
continue;
}
memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
// check
@ -364,7 +384,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
n += nRead;
pgno++;
++pgno;
bOffset = 0;
}
@ -375,7 +395,7 @@ _exit:
return code;
}
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
int32_t code = 0;
if (!pFD->pFD) {
code = tsdbOpenFileImpl(pFD);
@ -385,7 +405,7 @@ int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size)
}
if (pFD->s3File && tsS3BlockSize < 0) {
return tsdbReadFileS3(pFD, offset, pBuf, size);
return tsdbReadFileS3(pFD, offset, pBuf, size, szHint);
} else {
return tsdbReadFileImp(pFD, offset, pBuf, size);
}
@ -1141,7 +1161,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0);
if (code) goto _err;
// decode
@ -1178,7 +1198,7 @@ int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
if (code) goto _err;
// read
code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size);
code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0);
if (code) goto _err;
// decode
@ -1211,7 +1231,7 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0);
if (code) goto _err;
// decode
@ -1242,7 +1262,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size);
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size, 0);
if (code) goto _err;
// decode
@ -1276,7 +1296,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
if (code) goto _err;
code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey);
code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey, 0);
if (code) goto _err;
SDiskDataHdr hdr;
@ -1322,7 +1342,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
code = tRealloc(&pReader->aBuf[0], hdr.szBlkCol);
if (code) goto _err;
code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol);
code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol, 0);
if (code) goto _err;
}
@ -1366,7 +1386,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
code = tRealloc(&pReader->aBuf[1], size);
if (code) goto _err;
code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size, 0);
if (code) goto _err;
code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
@ -1392,7 +1412,7 @@ int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockDat
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock);
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock, 0);
if (code) goto _err;
// decmpr
@ -1444,7 +1464,7 @@ int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk
TSDB_CHECK_CODE(code, lino, _exit);
// read
code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock);
code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decmpr
@ -1700,7 +1720,7 @@ int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelDa
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0);
if (code) goto _err;
// // decode
@ -1740,7 +1760,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0);
if (code) goto _err;
// decode

View File

@ -60,7 +60,7 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
int64_t offset = config->file->size - sizeof(SSttFooter);
ASSERT(offset >= TSDB_FHDR_SIZE);
code = tsdbReadFile(reader[0]->fd, offset, (uint8_t *)(reader[0]->footer), sizeof(SSttFooter));
code = tsdbReadFile(reader[0]->fd, offset, (uint8_t *)(reader[0]->footer), sizeof(SSttFooter), 0);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
@ -97,7 +97,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray *
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code =
tsdbReadFile(reader->fd, reader->footer->statisBlkPtr->offset, data, reader->footer->statisBlkPtr->size);
tsdbReadFile(reader->fd, reader->footer->statisBlkPtr->offset, data, reader->footer->statisBlkPtr->size, 0);
if (code) {
taosMemoryFree(data);
return code;
@ -125,7 +125,7 @@ int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tom
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code =
tsdbReadFile(reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
tsdbReadFile(reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size, 0);
if (code) {
taosMemoryFree(data);
return code;
@ -152,7 +152,8 @@ int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBl
void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = tsdbReadFile(reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
int32_t code =
tsdbReadFile(reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size, 0);
if (code) {
taosMemoryFree(data);
return code;
@ -177,7 +178,7 @@ int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk,
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szBlock);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szBlock);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szBlock, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecmprBlockData(reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData, &reader->config->bufArr[1]);
@ -209,7 +210,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szKey);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szKey, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// hdr
@ -255,7 +256,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, reader->config->bufArr[0],
hdr->szBlkCol);
hdr->szBlkCol, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -296,7 +297,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
reader->config->bufArr[1], size1);
reader->config->bufArr[1], size1, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
@ -321,7 +322,7 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk
code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size);
code = tsdbReadFile(reader->fd, tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size, 0);
if (code) TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
@ -352,7 +353,7 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
code = tRealloc(&reader->config->bufArr[0], statisBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size);
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
@ -405,7 +406,7 @@ struct SSttFileWriter {
};
static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) {
TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) {
if (blockData->nRow == 0) return 0;
int32_t code = 0;

View File

@ -13,9 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "sync.h"
#include "vnd.h"
#include "vnodeInt.h"
#include "sync.h"
extern int32_t tsdbPreCommit(STsdb *pTsdb);
extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo);
@ -155,7 +156,8 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) {
needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit);
needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) ||
(atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed));
}
taosThreadMutexUnlock(&pVnode->mutex);
return needCommit;

View File

@ -595,6 +595,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
tqProcessTaskResetReq(pVnode->pTq, pMsg);
}
} break;
case TDMT_STREAM_HTASK_DROP: {
if (pVnode->restored && vnodeIsLeader(pVnode)) {
tqProcessTaskDropHTask(pVnode->pTq, pMsg);
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -503,15 +503,15 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock**
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
qError("dynamic post task begin");
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
*ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam);
if (*ppRes) {
pPost->isStarted = true;
pStbJoin->execInfo.postBlkNum++;
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
qError("join res block retrieved");
qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
} else {
qError("Empty join res block retrieved");
qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
}
}

View File

@ -1525,6 +1525,17 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
return TSDB_CODE_SUCCESS;
}
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
pBlock->info.parTbName[0] = 0;
} else {
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
}
}
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
blockDataCleanup(pDestBlock);
int32_t rows = pSrcBlock->info.rows;
@ -1549,15 +1560,21 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t srcUid = srcUidData[i];
uint64_t groupId = srcGp[i];
char* tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
if (groupId == 0) {
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
}
if (pInfo->tbnameCalSup.pExprInfo) {
void* parTbname = NULL;
pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
if (code != TSDB_CODE_SUCCESS) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver);
printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo));
calBlockTbName(pInfo, pPreRes);
memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName));
} else {
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
}
varDataSetLen(tbname, strlen(varDataVal(tbname)));
pInfo->stateStore.streamStateFreeVal(parTbname);
}
@ -1583,17 +1600,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code;
}
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
pBlock->info.parTbName[0] = 0;
} else {
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
}
}
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);

View File

@ -23,6 +23,7 @@ extern "C" {
#include "parToken.h"
#include "parUtil.h"
#include "parser.h"
#include "cmdnodes.h"
typedef struct STranslateContext {
SParseContext* pParseCxt;
@ -46,7 +47,9 @@ typedef struct STranslateContext {
SNode* pPostRoot;
} STranslateContext;
bool biRewriteToTbnameFunc(STranslateContext* pCxt, SNode** ppNode);
int32_t biRewriteSelectStar(STranslateContext* pCxt, SSelectStmt* pSelect);
int32_t biCheckCreateTableTbnameCol(STranslateContext* pCxt, SCreateTableStmt* pStmt);
int32_t findTable(STranslateContext* pCxt, const char* pTableAlias, STableNode** pOutput);
int32_t getTargetMetaImpl(SParseContext* pParCxt, SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta, bool couldBeView);

View File

@ -263,6 +263,7 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode);
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal);
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc);
static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList,
SSelectStmt** pStmt);
static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery);
@ -1091,6 +1092,12 @@ static EDealRes translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** p
return DEAL_RES_CONTINUE;
}
#ifndef TD_ENTERPRISE
bool biRewriteToTbnameFunc(STranslateContext* pCxt, SNode** ppNode) {
return false;
}
#endif
static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) {
if (NULL == pCxt->pCurrStmt ||
(isSelectStmt(pCxt->pCurrStmt) && NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable)) {
@ -1102,6 +1109,13 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) {
return DEAL_RES_CONTINUE;
}
if (pCxt->pParseCxt->biMode) {
SNode** ppNode = (SNode**)pCol;
if (biRewriteToTbnameFunc(pCxt, ppNode)) {
return translateFunction(pCxt, (SFunctionNode**)ppNode);
}
}
EDealRes res = DEAL_RES_CONTINUE;
if ('\0' != (*pCol)->tableAlias[0]) {
res = translateColumnWithPrefix(pCxt, pCol);
@ -2212,7 +2226,8 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc
SNode* pParam = NULL;
FOREACH(pParam, (*pFunc)->pParameterList) {
if (isMultiResFunc(pParam)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)pParam)->aliasName);
pCxt->errCode = TSDB_CODE_FUNC_FUNTION_PARA_NUM;
return DEAL_RES_ERROR;
}
}
@ -5712,6 +5727,12 @@ static int32_t checkTableDeleteMarkOption(STranslateContext* pCxt, STableOptions
return code;
}
#ifndef TD_ENTERPRISE
int32_t biCheckCreateTableTbnameCol(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
#endif
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) {
if (NULL != strchr(pStmt->tableName, '.')) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
@ -5750,7 +5771,9 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
"configured with the 'TTL' option");
}
}
if (pCxt->pParseCxt->biMode != 0 && TSDB_CODE_SUCCESS == code) {
code = biCheckCreateTableTbnameCol(pCxt, pStmt);
}
return code;
}

View File

@ -383,7 +383,7 @@ TEST_F(ParserSelectTest, semanticCheck) {
run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE);
run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE);
run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_FUNC_FUNTION_PARA_NUM);
// TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION
run("SELECT c2 FROM t1 tt1 join t1 tt2 on COUNT(*) > 0", TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION);

View File

@ -1457,13 +1457,29 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
case 604800000: { /* 1w */
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal / 1000 / 604800 * 604800 * 1000;
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000)) % (((int64_t)604800) * 1000);
} else {
timeVal = timeVal / 1000 / 604800 * 604800 * 1000;
}
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal / 1000000 / 604800 * 604800 * 1000000;
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000)) % (((int64_t)604800) * 1000000);
} else {
timeVal = timeVal / 1000000 / 604800 * 604800 * 1000000;
}
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000000000 / 604800 * 604800 * 1000000000;
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000000)) % (((int64_t)604800) * 1000000000);
} else {
timeVal = timeVal / 1000000000 / 604800 * 604800 * 1000000000;
}
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor / factor / 604800 * 604800 * factor;
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1)) % (((int64_t)604800L) * factor);
} else {
timeVal = timeVal * factor / factor / 604800 * 604800 * factor;
}
} else {
colDataSetNULL(pOutput->columnData, i);
continue;

View File

@ -122,31 +122,37 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
void streamClearChkptReadyMsg(SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
// <<<<<<< HEAD
// void streamClearChkptReadyMsg(SStreamTask* pTask);
// int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const
// char*); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo*
// pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
// void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// =======
// >>>>>>> 3.0
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue);
@ -168,8 +174,8 @@ int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
int downloadCheckpointByName(char* id, char* fname, char* dstName);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);

View File

@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
}
// todo handle memory error
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
terrno = 0;
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {

View File

@ -958,7 +958,9 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
}
void streamClearChkptReadyMsg(SStreamTask* pTask) {
if (pTask->pReadyMsgList == NULL) return;
if (pTask->pReadyMsgList == NULL) {
return;
}
for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);

View File

@ -401,7 +401,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 5. save to disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
// 6. pause allowed.
// 6. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);

View File

@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*pInput = qItem;
} else {
// merge current block failed, let's handle the already merged blocks.
void* newRet = streamMergeQueueItem(*pInput, qItem);
void* newRet = streamQueueMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
if (terrno != 0) {
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,

View File

@ -323,7 +323,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
}
int32_t onNormalTaskReady(SStreamTask* pTask) {
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
streamTaskSetReady(pTask);
@ -348,7 +348,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// set the state to be ready

View File

@ -452,7 +452,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);

View File

@ -457,11 +457,11 @@ void doInitStateTransferTable(void) {
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event

View File

@ -28,10 +28,12 @@
static int32_t httpRefMgt = 0;
static int64_t httpRef = -1;
static int32_t FAST_FAILURE_LIMIT = 120;
typedef struct SHttpModule {
uv_loop_t* loop;
SAsyncPool* asyncPool;
TdThread thread;
SHashObj* connStatusTable;
} SHttpModule;
typedef struct SHttpMsg {
@ -64,6 +66,8 @@ static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit();
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
@ -193,11 +197,20 @@ static void httpAsyncCb(uv_async_t* handle) {
SHttpMsg *msg = NULL, *quitMsg = NULL;
queue wq;
QUEUE_INIT(&wq);
static int32_t BATCH_SIZE = 5;
int32_t count = 0;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) {
queue* h = QUEUE_HEAD(&item->qmsg);
QUEUE_REMOVE(h);
QUEUE_PUSH(&wq, h);
}
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
@ -262,14 +275,20 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
}
}
static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
SHttpClient* cli = req->data;
if (status != 0) {
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
taosReleaseRef(httpRefMgt, httpRef);
return;
}
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1);
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
if (0 != status) {
tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
@ -277,6 +296,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
}
taosReleaseRef(httpRefMgt, httpRef);
}
int32_t httpSendQuit() {
@ -349,16 +369,51 @@ static void httpHandleQuit(SHttpMsg* msg) {
uv_walk(http->loop, httpWalkCb, NULL);
taosReleaseRef(httpRefMgt, httpRef);
}
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) {
char buf[256] = {0};
sprintf(buf, "%s:%d", server, port);
int32_t* failedTime = (int32_t*)taosHashGet(pTable, buf, strlen(buf));
if (failedTime == NULL) {
return false;
}
int32_t now = taosGetTimestampSec();
if (*failedTime > now - FAST_FAILURE_LIMIT) {
tDebug("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf);
return true;
} else {
return false;
}
}
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) {
char buf[256] = {0};
sprintf(buf, "%s:%d", server, port);
if (succ) {
taosHashRemove(pTable, buf, strlen(buf));
} else {
int32_t st = taosGetTimestampSec();
taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st));
}
return;
}
static void httpHandleReq(SHttpMsg* msg) {
int32_t ignore = false;
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
if (http == NULL) {
goto END;
}
if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) {
ignore = true;
goto END;
}
struct sockaddr_in dest = {0};
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
goto END;
}
if (msg->flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
if (dstLen > 0) {
@ -399,11 +454,11 @@ static void httpHandleReq(SHttpMsg* msg) {
uv_tcp_init(http->loop, &cli->tcp);
// set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5);
int32_t fd = taosCreateSocketWithTimeout(5000);
if (fd < 0) {
tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port);
taosReleaseRef(httpRefMgt, httpRef);
destroyHttpClient(cli);
taosReleaseRef(httpRefMgt, httpRef);
return;
}
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
@ -418,13 +473,16 @@ static void httpHandleReq(SHttpMsg* msg) {
if (ret != 0) {
tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
cli->port);
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
destroyHttpClient(cli);
}
taosReleaseRef(httpRefMgt, httpRef);
return;
END:
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
if (ignore == false) {
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
}
httpDestroyMsg(msg);
taosReleaseRef(httpRefMgt, httpRef);
}
@ -441,6 +499,8 @@ static void transHttpEnvInit() {
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
uv_loop_init(http->loop);
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
@ -474,6 +534,8 @@ void transHttpEnvDestroy() {
uv_loop_close(load->loop);
taosMemoryFree(load->loop);
taosHashCleanup(load->connStatusTable);
taosReleaseRef(httpRefMgt, httpRef);
taosRemoveRef(httpRefMgt, httpRef);
}

View File

@ -2,6 +2,7 @@ add_executable(transportTest "")
add_executable(transUT "")
add_executable(svrBench "")
add_executable(cliBench "")
add_executable(httpBench "")
target_sources(transUT
PRIVATE
@ -21,6 +22,10 @@ target_sources(cliBench
PRIVATE
"cliBench.c"
)
target_sources(httpBench
PRIVATE
"http_test.c"
)
target_include_directories(transportTest
PUBLIC
@ -51,11 +56,6 @@ target_include_directories(transUT
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(svrBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(svrBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
@ -75,7 +75,8 @@ target_include_directories(cliBench
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(cliBench
target_include_directories(httpBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
@ -89,6 +90,14 @@ target_link_libraries (cliBench
transport
)
target_link_libraries(httpBench
os
util
common
gtest_main
transport
)
add_test(
NAME transUT
COMMAND transUT

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thttp.h"
#include "transLog.h"
#include "trpc.h"
#include "tutil.h"
#include "tversion.h"
void initLogEnv() {
const char * logDir = "/tmp/trans_cli";
const char * defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 1000000;
tsAsyncLog = 0;
// rpcDebugflag = 143;
strcpy(tsLogDir, (char *)logDir);
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
typedef struct TThread {
TdThread thread;
int idx;
} TThread;
void *proces(void *arg) {
char *monitor = "172.26.10.94";
while (1) {
int32_t len = 512;
char * msg = taosMemoryCalloc(1, len);
memset(msg, 1, len);
int32_t code = taosSendHttpReport(monitor, "/crash", 6050, msg, 10, HTTP_FLAT);
taosMemoryFree(msg);
taosUsleep(10);
}
}
int main(int argc, char *argv[]) {
initLogEnv();
int32_t numOfThreads = 10;
TThread *thread = taosMemoryCalloc(1, sizeof(TThread) * numOfThreads);
for (int i = 0; i < numOfThreads; i++) {
thread[i].idx = i;
taosThreadCreate(&(thread[i].thread), NULL, proces, (void *)&thread[i]);
}
while (1) {
taosMsleep(5000);
}
taosCloseLog();
return 0;
}

View File

@ -280,8 +280,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
#endif
}
/* ----------------------------------------------Bool Compression
* ---------------------------------------------- */
/* ----------------------------------------------Bool Compression ---------------------------------------------- */
// TODO: You can also implement it using RLE method.
int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
int32_t pos = -1;
@ -387,8 +386,7 @@ int32_t tsDecompressBoolRLEImp(const char *const input, const int32_t nelements,
}
#endif
/* ----------------------------------------------String Compression
* ---------------------------------------------- */
/* ----------------------------------------------String Compression ---------------------------------------------- */
// Note: the size of the output must be larger than input_size + 1 and
// LZ4_compressBound(size) + 1;
// >= max(input_size, LZ4_compressBound(input_size)) + 1;
@ -430,8 +428,7 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c
}
}
/* --------------------------------------------Timestamp Compression
* ---------------------------------------------- */
/* --------------------------------------------Timestamp Compression ---------------------------------------------- */
// TODO: Take care here, we assumes little endian encoding.
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
int32_t _pos = 1;
@ -751,8 +748,7 @@ int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements,
return nelements * DOUBLE_BYTES;
}
/* --------------------------------------------Float Compression
* ---------------------------------------------- */
/* --------------------------------------------Float Compression ---------------------------------------------- */
void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) {
uint8_t nbytes = (flag & INT8MASK(3)) + 1;
int32_t nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);

View File

@ -50,7 +50,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
const char *ip = input + 1;
int32_t count = 0;
int32_t _pos = 0;
int64_t prev_value = 0;
int64_t prevValue = 0;
#if __AVX2__
while (1) {
@ -80,13 +80,13 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
if (tsSIMDEnable && tsAVX2Enable) {
for (int32_t i = 0; i < batch; ++i) {
__m256i prev = _mm256_set1_epi64x(prev_value);
__m256i prev = _mm256_set1_epi64x(prevValue);
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
_pos += 4;
}
for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prev_value;
p[_pos++] = prevValue;
}
} else if (tsSIMDEnable && tsAVX512Enable) {
#if __AVX512F__
@ -94,7 +94,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
#endif
} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = prev_value;
p[_pos++] = prevValue;
v += bit;
}
}
@ -118,16 +118,16 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// decode[0] = prevValue + final[0]
// decode[1] = decode[0] + final[1] -----> prevValue + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prevValue + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prevValue + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4
//+ 0, 1, 0, 3
// 1, 3, 3, 7
// shift and add for the first round
__m128i prev = _mm_set1_epi64x(prev_value);
__m128i prev = _mm_set1_epi64x(prevValue);
__m256i x = _mm256_slli_si256(delta, 8);
delta = _mm256_add_epi64(delta, x);
@ -148,16 +148,16 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);
shiftBits = _mm256_add_epi64(shiftBits, inc);
prev_value = p[_pos + 3];
prevValue = p[_pos + 3];
_pos += 4;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
p[_pos++] = prevValue;
v += bit;
}
} else if (tsSIMDEnable && tsAVX512Enable) {
@ -167,9 +167,9 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
p[_pos++] = prevValue;
v += bit;
}
}
@ -180,14 +180,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int32_t)prev_value;
p[_pos++] = (int32_t)prevValue;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int32_t)prev_value;
p[_pos++] = (int32_t)prevValue;
v += bit;
}
}
@ -197,14 +197,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int16_t)prev_value;
p[_pos++] = (int16_t)prevValue;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int16_t)prev_value;
p[_pos++] = (int16_t)prevValue;
v += bit;
}
}
@ -215,14 +215,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int8_t)prev_value;
p[_pos++] = (int8_t)prevValue;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int8_t)prev_value;
p[_pos++] = (int8_t)prevValue;
v += bit;
}
}
@ -246,6 +246,77 @@ int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelem
// todo add later
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) {
#if __AVX2__
#endif
return 0;
}
int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian) {
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
int8_t nbytes = 0;
int64_t prevValue = 0;
int64_t prevDelta = 0;
int64_t deltaOfDelta = 0;
int32_t longBytes = LONG_BYTES;
#if __AVX2__
int32_t batch = nelements >> 2;
int32_t remainder = nelements & 0x1;
while (1) {
uint8_t flags = input[ipos++];
// Decode dd1
uint64_t dd1 = 0;
nbytes = flags & INT8MASK(4);
if (nbytes == 0) {
deltaOfDelta = 0;
} else {
if (bigEndian) {
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd1, input + ipos, nbytes);
}
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd1);
}
ipos += nbytes;
prevDelta += deltaOfDelta;
prevValue += prevDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
}
// Decode dd2
uint64_t dd2 = 0;
nbytes = (flags >> 4) & INT8MASK(4);
if (nbytes == 0) {
deltaOfDelta = 0;
} else {
if (bigEndian) {
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd2, input + ipos, nbytes);
}
// zigzag_decoding
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd2);
}
ipos += nbytes;
prevDelta += deltaOfDelta;
prevValue += prevDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
}
}
#endif
return 0;
}

View File

@ -101,6 +101,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing d
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")

View File

@ -1068,6 +1068,7 @@ e
,,y,script,./test.sh -f tsim/query/show_db_table_kind.sim
,,y,script,./test.sh -f tsim/query/bi_star_table.sim
,,y,script,./test.sh -f tsim/query/bi_tag_scan.sim
,,y,script,./test.sh -f tsim/query/bi_tbname_col.sim
,,y,script,./test.sh -f tsim/query/tag_scan.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/query/bug3398.sim

View File

@ -0,0 +1,36 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists db1;
sql create database db1 vgroups 3;
sql create database db1;
sql use db1;
sql create stable sta (ts timestamp, f1 int, f2 binary(200)) tags(t1 int, t2 int, t3 int);
sql create stable stb (ts timestamp, f1 int, f2 binary(200)) tags(t1 int, t2 int, t3 int);
sql create table tba1 using sta tags(1, 1, 1);
sql create table tba2 using sta tags(2, 2, 2);
sql insert into tba1 values(now, 1, "1")(now+3s, 3, "3")(now+5s, 5, "5");
sql insert into tba2 values(now + 1s, 2, "2")(now+2s, 2, "2")(now+4s, 4, "4");
sql create table tbn1 (ts timestamp, f1 int);
set_bi_mode 1
sql select `tbname`, f1, f2 from sta order by ts
print $rows
print $data00 $data01 $data02 $data10 $data11 $data12
if $rows != 6 then
return -1
endi
if $data00 != @tba1@ then
return -1
endi
if $data10 != @tba2@ then
return -1
endi
sql_error create table stc(ts timestamp, `tbname` binary(200));
sql_error create table std(ts timestamp, f1 int) tags(`tbname` binary(200));
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -13,6 +13,7 @@
import random
import string
import threading
from util.log import *
from util.cases import *
from util.sql import *
@ -25,10 +26,24 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
self.fname = __file__ + '.tmp.sql'
self.dbname = 'db1'
self.ntbname = 'ntb'
self.stbname = 'stb'
self.stbnum = 10
self.ntbnum = 10
self.colnum = 52
self.tagnum = 15
self.collen = 320
self.colnum_modify = 40
self.tagnum_modify = 40
self.collen_old_modify = 160
self.collen_new_modify = 455
self.taglen_old_modify = 80
self.taglen_new_modify = 155
self.binary_length = 20 # the length of binary for column_dict
self.nchar_length = 20 # the length of nchar for column_dict
self.threadnum = 2
self.column_dict = {
'ts' : 'timestamp',
'col1': 'tinyint',
@ -183,9 +198,114 @@ class TDTestCase:
tdLog.info(res)
assert(res[1][2] == 39001)
def prepareAlterEnv(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database if not exists {self.dbname} vgroups 2')
tdSql.execute(f'use {self.dbname}')
def destroyAlterEnv(self):
tdSql.execute(f'drop database if exists {self.dbname}')
def alterTableTask(self, i):
os.system(f'taos -f {self.fname}.{i};')
def executeAlterTable(self, opt):
threads = []
for i in range(self.threadnum):
thread = threading.Thread(target=self.alterTableTask, args=(i,))
threads.append(thread)
thread.start()
for i in range(self.threadnum):
threads[i].join()
def checkAlterTable(self, opt):
if opt in ["stb_add_col", "stb_add_tag"]:
for i in range(self.stbnum):
tdSql.execute(f'desc {self.stbname}_{i}')
elif opt in ["stb_modify_col", "stb_modify_tag"]:
for i in range(self.stbnum):
tdSql.execute(f'desc {self.stbname}_{i}')
elif opt in ["ntb_add_col", "ntb_modify_col"]:
for i in range(self.ntbnum):
tdSql.execute(f'desc {self.ntbname}_{i}')
def destroyAlterTable(self):
for i in range(self.threadnum):
if os.path.isfile(f'{self.fname}.{i}'):
os.remove(f'{self.fname}.{i}')
def prepareAlterTable(self, opt):
self.destroyAlterTable()
lines = [f'use {self.dbname};\n']
if opt in ["stb_add_col", "stb_add_tag"]:
for i in range(self.stbnum):
tdSql.execute(f'create table if not exists {self.stbname}_{i} (ts timestamp, c_0 NCHAR({self.collen})) tags(t0 nchar({self.collen}));')
for i in range(self.stbnum):
if opt == 'stb_add_col':
for c in range(1, self.colnum):
lines.append(f'alter table {self.stbname}_{i} add column c_{c} NCHAR({self.collen});\n')
else:
for c in range(1, self.tagnum):
lines.append(f'alter table {self.stbname}_{i} add tag t_{c} NCHAR({self.collen});\n')
elif opt in ["stb_modify_col", "stb_modify_tag"]:
for i in range(self.stbnum):
createTbSql = f'CREATE table if not exists {self.stbname}_{i} (ts timestamp'
for j in range(self.colnum_modify):
createTbSql += f',c_{j} NCHAR({self.collen_old_modify})'
createTbSql += f') tags(t_0 NCHAR({self.taglen_old_modify})'
for k in range(1,self.tagnum_modify):
createTbSql += f',t_{k} NCHAR({self.taglen_old_modify})'
createTbSql += f');'
tdLog.info(createTbSql)
tdSql.execute(createTbSql)
for i in range(self.stbnum):
if opt == 'stb_modify_col':
for c in range(self.colnum_modify):
lines.append(f'alter table {self.stbname}_{i} modify column c_{c} NCHAR({self.collen_new_modify});\n')
else:
for c in range(self.tagnum_modify):
lines.append(f'alter table {self.stbname}_{i} modify tag t_{c} NCHAR({self.taglen_new_modify});\n')
elif opt in ['ntb_add_col']:
for i in range(self.ntbnum):
tdSql.execute(f'create table if not exists {self.ntbname}_{i} (ts timestamp, c_0 NCHAR({self.collen}));')
for i in range(self.ntbnum):
for c in range(1, self.colnum):
lines.append(f'alter table {self.ntbname}_{i} add column c_{c} NCHAR({self.collen});\n')
elif opt in ['ntb_modify_col']:
for i in range(self.ntbnum):
createTbSql = f'CREATE table if not exists {self.ntbname}_{i} (ts timestamp'
for j in range(self.colnum_modify):
createTbSql += f',c_{j} NCHAR({self.collen_old_modify})'
createTbSql += f');'
tdLog.info(createTbSql)
tdSql.execute(createTbSql)
for i in range(self.ntbnum):
for c in range(self.colnum_modify):
lines.append(f'alter table {self.ntbname}_{i} modify column c_{c} NCHAR({self.collen_new_modify});\n')
# generate sql file
with open(f'{self.fname}.0', "a") as f:
f.writelines(lines)
# clone sql file in case of race condition
for i in range(1, self.threadnum):
shutil.copy(f'{self.fname}.0', f'{self.fname}.{i}')
def alter_stable_multi_client_check(self):
"""Check alter stable/ntable var type column/tag(PI-23)
"""
alter_table_check_type = ["stb_add_col", "stb_add_tag", "stb_modify_col", "stb_modify_tag", "ntb_add_col", "ntb_modify_col"]
for opt in alter_table_check_type:
self.prepareAlterEnv()
self.prepareAlterTable(opt)
self.executeAlterTable(opt)
self.checkAlterTable(opt)
self.destroyAlterTable()
self.destroyAlterEnv()
def run(self):
self.alter_stable_check()
self.alter_stable_column_varchar_39001()
self.alter_stable_multi_client_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)

View File

@ -58,7 +58,11 @@ class TDTestCase:
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_time.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24/7)*7*24*60*60*1000)
if ignore_tz == 0:
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24/7)*7*24*60*60*1000)
else:
# assuming the client timezone is UTC+0800
tdSql.checkEqual(ts_result,int(date_time[i] - (date_time[i] + 8 * 3600 * 1000) % (86400 * 7 * 1000)))
def check_us_timestamp(self,unit,date_time, ignore_tz):
if unit.lower() == '1u':
@ -92,7 +96,11 @@ class TDTestCase:
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24/7)*7*24*60*60*1000*1000)
if ignore_tz == 0:
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24/7)*7*24*60*60*1000*1000)
else:
# assuming the client timezone is UTC+0800
tdSql.checkEqual(ts_result,int(date_time[i] - (date_time[i] + 8 * 3600 * 1000000) % (86400 * 7 * 1000000)))
def check_ns_timestamp(self,unit,date_time, ignore_tz):
if unit.lower() == '1b':
@ -130,7 +138,11 @@ class TDTestCase:
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
if self.rest_tag != 'rest':
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/1000/60/60/24/7)*7*24*60*60*1000*1000*1000)
if ignore_tz == 0:
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/1000/60/60/24/7)*7*24*60*60*1000*1000*1000)
else:
# assuming the client timezone is UTC+0800
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i] - (date_time[i] + 8 * 3600 * 1000000000) % (86400 * 7 * 1000000000)))
def check_tb_type(self,unit,tb_type,ignore_tz):
if tb_type.lower() == 'ntb':

View File

@ -693,8 +693,8 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
ret = 0;
break;
} else if (ret != 0) {
simDebug("script:%s, taos:%p, %s failed, ret:%d:%s, error:%s", script->fileName, script->taos, rest, ret & 0XFFFF,
tstrerror(ret), taos_errstr(pSql));
simDebug("script:%s, taos:%p, %s failed, ret:%d:%s", script->fileName, script->taos, rest, ret & 0XFFFF,
tstrerror(ret));
if (line->errorJump == SQL_JUMP_TRUE) {
script->linePos = line->jump;