Merge branch 'main' into feature/3_liaohj
This commit is contained in:
commit
53f27901a5
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG 94d6895
|
GIT_TAG 5aa25e9
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -41,6 +41,12 @@ typedef struct SBlockOrderInfo {
|
||||||
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
#define colDataSetNull_f_s(c_, r_) \
|
||||||
|
do { \
|
||||||
|
colDataSetNull_f((c_)->nullbitmap, r_); \
|
||||||
|
memset(((char*)(c_)->pData) + (c_)->info.bytes * (r_), 0, (c_)->info.bytes); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
#define colDataClearNull_f(bm_, r_) \
|
#define colDataClearNull_f(bm_, r_) \
|
||||||
do { \
|
do { \
|
||||||
BMCharPos(bm_, r_) &= ((char)(~(1u << (7u - BitPos(r_))))); \
|
BMCharPos(bm_, r_) &= ((char)(~(1u << (7u - BitPos(r_))))); \
|
||||||
|
@ -136,7 +142,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
colDataSetNull_var(pColumnInfoData, currentRow); // it is a null value of VAR type.
|
colDataSetNull_var(pColumnInfoData, currentRow); // it is a null value of VAR type.
|
||||||
} else {
|
} else {
|
||||||
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
|
colDataSetNull_f_s(pColumnInfoData, currentRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
pColumnInfoData->hasNull = true;
|
pColumnInfoData->hasNull = true;
|
||||||
|
@ -151,6 +157,7 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui
|
||||||
for (int32_t i = start; i < start + nRows; ++i) {
|
for (int32_t i = start; i < start + nRows; ++i) {
|
||||||
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
|
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
|
||||||
}
|
}
|
||||||
|
memset(pColumnInfoData->pData + start * pColumnInfoData->info.bytes, 0, pColumnInfoData->info.bytes * nRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
pColumnInfoData->hasNull = true;
|
pColumnInfoData->hasNull = true;
|
||||||
|
@ -231,7 +238,6 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
|
||||||
|
|
||||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
||||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||||
int32_t blockDataEnsureCapacityNoClear(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
|
||||||
|
|
||||||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
|
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||||
void blockDataCleanup(SSDataBlock* pDataBlock);
|
void blockDataCleanup(SSDataBlock* pDataBlock);
|
||||||
|
|
|
@ -273,7 +273,7 @@ if [ "$osType" != "Darwin" ]; then
|
||||||
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
||||||
|
|
||||||
${csudo}./make-taos-tools-deb.sh ${top_dir} \
|
${csudo}./make-taos-tools-deb.sh ${top_dir} \
|
||||||
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
|
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${verNumberComp}
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
|
@ -298,7 +298,7 @@ if [ "$osType" != "Darwin" ]; then
|
||||||
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
||||||
|
|
||||||
${csudo}./make-taos-tools-rpm.sh ${top_dir} \
|
${csudo}./make-taos-tools-rpm.sh ${top_dir} \
|
||||||
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
|
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${verNumberComp}
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
|
|
|
@ -69,7 +69,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
pColumnInfoData->varmeta.offset[currentRow] = -1; // it is a null value of VAR type.
|
pColumnInfoData->varmeta.offset[currentRow] = -1; // it is a null value of VAR type.
|
||||||
} else {
|
} else {
|
||||||
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
|
colDataSetNull_f_s(pColumnInfoData, currentRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
pColumnInfoData->hasNull = true;
|
pColumnInfoData->hasNull = true;
|
||||||
|
@ -825,7 +825,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
|
||||||
} else {
|
} else {
|
||||||
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
|
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
|
||||||
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
|
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
|
||||||
colDataSetNull_f(pDst->nullbitmap, j);
|
colDataSetNull_f_s(pDst, j);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
|
memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
|
||||||
|
@ -1161,15 +1161,16 @@ void blockDataEmpty(SSDataBlock* pDataBlock) {
|
||||||
pInfo->window.skey = 0;
|
pInfo->window.skey = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo temporarily disable it
|
/*
|
||||||
|
* NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
|
||||||
|
* the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
|
||||||
|
* any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
|
||||||
|
*/
|
||||||
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
|
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
|
||||||
if (numOfRows <= 0 || numOfRows <= pBlockInfo->capacity) {
|
if (numOfRows <= 0 || numOfRows <= pBlockInfo->capacity) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo temp disable it
|
|
||||||
// ASSERT(pColumn->info.bytes != 0);
|
|
||||||
|
|
||||||
int32_t existedRows = pBlockInfo->rows;
|
int32_t existedRows = pBlockInfo->rows;
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
||||||
|
@ -1194,7 +1195,8 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure the allocated memory is MALLOC_ALIGN_BYTES aligned
|
// here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned
|
||||||
|
// to MALLOC_ALIGN_BYTES
|
||||||
tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
|
tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1208,7 +1210,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
|
||||||
|
|
||||||
pColumn->pData = tmp;
|
pColumn->pData = tmp;
|
||||||
|
|
||||||
// todo remove it soon
|
// check if the allocated memory is aligned to the requried bytes.
|
||||||
#if defined LINUX
|
#if defined LINUX
|
||||||
if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
|
if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -1249,25 +1251,6 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
|
||||||
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, true);
|
|
||||||
if (code) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pDataBlock->info.capacity = numOfRows;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t blockDataEnsureCapacityNoClear(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
|
||||||
int32_t code = 0;
|
|
||||||
if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
|
|
|
@ -14,8 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "vmInt.h"
|
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
|
#include "vmInt.h"
|
||||||
|
|
||||||
#define MAX_CONTENT_LEN 2 * 1024 * 1024
|
#define MAX_CONTENT_LEN 2 * 1024 * 1024
|
||||||
|
|
||||||
|
@ -46,102 +46,108 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
|
||||||
return pVnodes;
|
return pVnodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
|
static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
|
||||||
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
|
int32_t code = -1;
|
||||||
int32_t len = 0;
|
|
||||||
int32_t maxLen = MAX_CONTENT_LEN;
|
|
||||||
char *content = taosMemoryCalloc(1, maxLen + 1);
|
|
||||||
cJSON *root = NULL;
|
|
||||||
FILE *fp = NULL;
|
|
||||||
char file[PATH_MAX] = {0};
|
|
||||||
SWrapperCfg *pCfgs = NULL;
|
SWrapperCfg *pCfgs = NULL;
|
||||||
TdFilePtr pFile = NULL;
|
*ppCfgs = NULL;
|
||||||
|
|
||||||
snprintf(file, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
|
SJson *vnodes = tjsonGetObjectItem(pJson, "vnodes");
|
||||||
|
if (vnodes == NULL) return -1;
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
|
||||||
if (pFile == NULL) {
|
|
||||||
dInfo("file %s not exist", file);
|
|
||||||
code = 0;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (content == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
len = (int32_t)taosReadFile(pFile, content, maxLen);
|
|
||||||
if (len <= 0) {
|
|
||||||
dError("failed to read %s since content is null", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
content[len] = 0;
|
|
||||||
root = cJSON_Parse(content);
|
|
||||||
if (root == NULL) {
|
|
||||||
dError("failed to read %s since invalid json format", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
|
|
||||||
if (!vnodes || vnodes->type != cJSON_Array) {
|
|
||||||
dError("failed to read %s since vnodes not found", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodesNum = cJSON_GetArraySize(vnodes);
|
int32_t vnodesNum = cJSON_GetArraySize(vnodes);
|
||||||
if (vnodesNum > 0) {
|
if (vnodesNum > 0) {
|
||||||
pCfgs = taosMemoryCalloc(vnodesNum, sizeof(SWrapperCfg));
|
pCfgs = taosMemoryCalloc(vnodesNum, sizeof(SWrapperCfg));
|
||||||
if (pCfgs == NULL) {
|
if (pCfgs == NULL) return -1;
|
||||||
dError("failed to read %s since out of memory", file);
|
}
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _OVER;
|
for (int32_t i = 0; i < vnodesNum; ++i) {
|
||||||
}
|
SJson *vnode = tjsonGetArrayItem(vnodes, i);
|
||||||
|
if (vnode == NULL) goto _OVER;
|
||||||
for (int32_t i = 0; i < vnodesNum; ++i) {
|
|
||||||
cJSON *vnode = cJSON_GetArrayItem(vnodes, i);
|
SWrapperCfg *pCfg = &pCfgs[i];
|
||||||
SWrapperCfg *pCfg = &pCfgs[i];
|
tjsonGetInt32ValueFromDouble(vnode, "vgId", pCfg->vgId, code);
|
||||||
|
if (code < 0) goto _OVER;
|
||||||
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
|
tjsonGetInt32ValueFromDouble(vnode, "dropped", pCfg->dropped, code);
|
||||||
if (!vgId || vgId->type != cJSON_Number) {
|
if (code < 0) goto _OVER;
|
||||||
dError("failed to read %s since vgId not found", file);
|
tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
|
||||||
taosMemoryFree(pCfgs);
|
if (code < 0) goto _OVER;
|
||||||
goto _OVER;
|
|
||||||
}
|
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId);
|
||||||
pCfg->vgId = vgId->valueint;
|
|
||||||
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId);
|
|
||||||
|
|
||||||
cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
|
|
||||||
if (!dropped || dropped->type != cJSON_Number) {
|
|
||||||
dError("failed to read %s since dropped not found", file);
|
|
||||||
taosMemoryFree(pCfgs);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
pCfg->dropped = dropped->valueint;
|
|
||||||
|
|
||||||
cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
|
|
||||||
if (!vgVersion || vgVersion->type != cJSON_Number) {
|
|
||||||
dError("failed to read %s since vgVersion not found", file);
|
|
||||||
taosMemoryFree(pCfgs);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
pCfg->vgVersion = vgVersion->valueint;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppCfgs = pCfgs;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*numOfVnodes = vnodesNum;
|
|
||||||
code = 0;
|
code = 0;
|
||||||
dInfo("succcessed to read file %s, numOfVnodes:%d", file, vnodesNum);
|
*ppCfgs = pCfgs;
|
||||||
|
*numOfVnodes = vnodesNum;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (content != NULL) taosMemoryFree(content);
|
if (*ppCfgs == NULL) taosMemoryFree(pCfgs);
|
||||||
if (root != NULL) cJSON_Delete(root);
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
|
||||||
|
int32_t code = -1;
|
||||||
|
TdFilePtr pFile = NULL;
|
||||||
|
char *pData = NULL;
|
||||||
|
SJson *pJson = NULL;
|
||||||
|
char file[PATH_MAX] = {0};
|
||||||
|
SWrapperCfg *pCfgs = NULL;
|
||||||
|
snprintf(file, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
|
||||||
|
|
||||||
|
if (taosStatFile(file, NULL, NULL) < 0) {
|
||||||
|
dInfo("vnode file:%s not exist", file);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
|
if (pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to open vnode file:%s since %s", file, terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t size = 0;
|
||||||
|
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to fstat mnode file:%s since %s", file, terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pData = taosMemoryMalloc(size + 1);
|
||||||
|
if (pData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosReadFile(pFile, pData, size) != size) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to read vnode file:%s since %s", file, terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pData[size] = '\0';
|
||||||
|
|
||||||
|
pJson = tjsonParse(pData);
|
||||||
|
if (pJson == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vmDecodeVnodeList(pJson, pMgmt, ppCfgs, numOfVnodes) < 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
dInfo("succceed to read vnode file %s", file);
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (pData != NULL) taosMemoryFree(pData);
|
||||||
|
if (pJson != NULL) cJSON_Delete(pJson);
|
||||||
if (pFile != NULL) taosCloseFile(&pFile);
|
if (pFile != NULL) taosCloseFile(&pFile);
|
||||||
|
|
||||||
terrno = code;
|
if (code != 0) {
|
||||||
|
dError("failed to read vnode file:%s since %s", file, terrstr());
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,14 +41,49 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF
|
||||||
taosThreadRwlockUnlock(&pData->lock);
|
taosThreadRwlockUnlock(&pData->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
tjsonGetInt32ValueFromDouble(pJson, "dnodeId", pData->dnodeId, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "dnodeVer", pData->dnodeVer, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "clusterId", pData->clusterId, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetInt32ValueFromDouble(pJson, "dropped", pData->dropped, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
|
||||||
|
SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
|
||||||
|
if (dnodes == NULL) return 0;
|
||||||
|
int32_t numOfDnodes = tjsonGetArraySize(dnodes);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfDnodes; ++i) {
|
||||||
|
SJson *dnode = tjsonGetArrayItem(dnodes, i);
|
||||||
|
if (dnode == NULL) return -1;
|
||||||
|
|
||||||
|
SDnodeEp dnodeEp = {0};
|
||||||
|
tjsonGetInt32ValueFromDouble(dnode, "id", dnodeEp.id, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
code = tjsonGetStringValue(dnode, "fqdn", dnodeEp.ep.fqdn);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetUInt16ValueFromDouble(dnode, "port", dnodeEp.ep.port, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetInt8ValueFromDouble(dnode, "isMnode", dnodeEp.isMnode, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
|
||||||
|
if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmReadEps(SDnodeData *pData) {
|
int32_t dmReadEps(SDnodeData *pData) {
|
||||||
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
|
int32_t code = -1;
|
||||||
int32_t len = 0;
|
|
||||||
int32_t maxLen = 256 * 1024;
|
|
||||||
char *content = taosMemoryCalloc(1, maxLen + 1);
|
|
||||||
cJSON *root = NULL;
|
|
||||||
char file[PATH_MAX] = {0};
|
|
||||||
TdFilePtr pFile = NULL;
|
TdFilePtr pFile = NULL;
|
||||||
|
char *content = NULL;
|
||||||
|
SJson *pJson = NULL;
|
||||||
|
char file[PATH_MAX] = {0};
|
||||||
|
snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
|
||||||
|
|
||||||
pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
|
pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
|
||||||
if (pData->dnodeEps == NULL) {
|
if (pData->dnodeEps == NULL) {
|
||||||
|
@ -56,113 +91,63 @@ int32_t dmReadEps(SDnodeData *pData) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
|
if (taosStatFile(file, NULL, NULL) < 0) {
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
dInfo("dnode file:%s not exist", file);
|
||||||
if (pFile == NULL) {
|
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = (int32_t)taosReadFile(pFile, content, maxLen);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (len <= 0) {
|
if (pFile == NULL) {
|
||||||
dError("failed to read %s since content is null", file);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to open dnode file:%s since %s", file, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
content[len] = 0;
|
int64_t size = 0;
|
||||||
root = cJSON_Parse(content);
|
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||||
if (root == NULL) {
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
dError("failed to read %s since invalid json format", file);
|
dError("failed to fstat dnode file:%s since %s", file, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
|
content = taosMemoryMalloc(size + 1);
|
||||||
if (!dnodeId || dnodeId->type != cJSON_Number) {
|
if (content == NULL) {
|
||||||
dError("failed to read %s since dnodeId not found", file);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
pData->dnodeId = dnodeId->valueint;
|
|
||||||
|
|
||||||
cJSON *dnodeVer = cJSON_GetObjectItem(root, "dnodeVer");
|
|
||||||
if (!dnodeVer || dnodeVer->type != cJSON_String) {
|
|
||||||
dError("failed to read %s since dnodeVer not found", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
pData->dnodeVer = atoll(dnodeVer->valuestring);
|
|
||||||
|
|
||||||
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
|
|
||||||
if (!clusterId || clusterId->type != cJSON_String) {
|
|
||||||
dError("failed to read %s since clusterId not found", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
pData->clusterId = atoll(clusterId->valuestring);
|
|
||||||
|
|
||||||
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
|
||||||
if (!dropped || dropped->type != cJSON_Number) {
|
|
||||||
dError("failed to read %s since dropped not found", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
pData->dropped = dropped->valueint;
|
|
||||||
|
|
||||||
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
|
|
||||||
if (!dnodes || dnodes->type != cJSON_Array) {
|
|
||||||
dError("failed to read %s since dnodes not found", file);
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
|
if (taosReadFile(pFile, content, size) != size) {
|
||||||
if (numOfDnodes <= 0) {
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
|
dError("failed to read dnode file:%s since %s", file, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfDnodes; ++i) {
|
content[size] = '\0';
|
||||||
cJSON *node = cJSON_GetArrayItem(dnodes, i);
|
|
||||||
if (node == NULL) break;
|
|
||||||
|
|
||||||
SDnodeEp dnodeEp = {0};
|
pJson = tjsonParse(content);
|
||||||
|
if (pJson == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
cJSON *did = cJSON_GetObjectItem(node, "id");
|
if (dmDecodeEps(pJson, pData) < 0) {
|
||||||
if (!did || did->type != cJSON_Number) {
|
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||||
dError("failed to read %s since dnodeId not found", file);
|
goto _OVER;
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dnodeEp.id = did->valueint;
|
|
||||||
|
|
||||||
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
|
|
||||||
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
|
|
||||||
dError("failed to read %s since dnodeFqdn not found", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
|
|
||||||
|
|
||||||
cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
|
|
||||||
if (!dnodePort || dnodePort->type != cJSON_Number) {
|
|
||||||
dError("failed to read %s since dnodePort not found", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dnodeEp.ep.port = dnodePort->valueint;
|
|
||||||
|
|
||||||
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
|
|
||||||
if (!isMnode || isMnode->type != cJSON_Number) {
|
|
||||||
dError("failed to read %s since isMnode not found", file);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
dnodeEp.isMnode = isMnode->valueint;
|
|
||||||
|
|
||||||
taosArrayPush(pData->dnodeEps, &dnodeEp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
dDebug("succcessed to read file %s", file);
|
dInfo("succceed to read mnode file %s", file);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (content != NULL) taosMemoryFree(content);
|
if (content != NULL) taosMemoryFree(content);
|
||||||
if (root != NULL) cJSON_Delete(root);
|
if (pJson != NULL) cJSON_Delete(pJson);
|
||||||
if (pFile != NULL) taosCloseFile(&pFile);
|
if (pFile != NULL) taosCloseFile(&pFile);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
dError("failed to read dnode file:%s since %s", file, terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pData->dnodeEps) == 0) {
|
if (taosArrayGetSize(pData->dnodeEps) == 0) {
|
||||||
SDnodeEp dnodeEp = {0};
|
SDnodeEp dnodeEp = {0};
|
||||||
dnodeEp.isMnode = 1;
|
dnodeEp.isMnode = 1;
|
||||||
|
@ -178,7 +163,6 @@ _OVER:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,7 +231,7 @@ _OVER:
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
dInfo("succeed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, terrstr(), pData->dnodeVer);
|
dError("failed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, terrstr(), pData->dnodeVer);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,48 +19,81 @@
|
||||||
|
|
||||||
#define MAXLEN 1024
|
#define MAXLEN 1024
|
||||||
|
|
||||||
int32_t dmReadFile(const char *path, const char *name, bool *pDeployed) {
|
static int32_t dmDecodeFile(SJson *pJson, bool *deployed) {
|
||||||
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
|
int32_t code = 0;
|
||||||
int64_t len = 0;
|
int32_t value = 0;
|
||||||
char content[MAXLEN + 1] = {0};
|
|
||||||
cJSON *root = NULL;
|
|
||||||
char file[PATH_MAX] = {0};
|
|
||||||
TdFilePtr pFile = NULL;
|
|
||||||
|
|
||||||
|
tjsonGetInt32ValueFromDouble(pJson, "deployed", value, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
|
||||||
|
*deployed = (value != 0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dmReadFile(const char *path, const char *name, bool *pDeployed) {
|
||||||
|
int32_t code = -1;
|
||||||
|
TdFilePtr pFile = NULL;
|
||||||
|
char *content = NULL;
|
||||||
|
SJson *pJson = NULL;
|
||||||
|
char file[PATH_MAX] = {0};
|
||||||
snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
|
snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
|
||||||
if (pFile == NULL) {
|
if (taosStatFile(file, NULL, NULL) < 0) {
|
||||||
|
dInfo("file:%s not exist", file);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = taosReadFile(pFile, content, MAXLEN);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (len <= 0) {
|
if (pFile == NULL) {
|
||||||
dError("failed to read %s since content is null", file);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to open file:%s since %s", file, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
root = cJSON_Parse(content);
|
int64_t size = 0;
|
||||||
if (root == NULL) {
|
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||||
dError("failed to read %s since invalid json format", file);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to fstat file:%s since %s", file, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
content = taosMemoryMalloc(size + 1);
|
||||||
if (!deployed || deployed->type != cJSON_Number) {
|
if (content == NULL) {
|
||||||
dError("failed to read %s since deployed not found", file);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosReadFile(pFile, content, size) != size) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to read file:%s since %s", file, terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
content[size] = '\0';
|
||||||
|
|
||||||
|
pJson = tjsonParse(content);
|
||||||
|
if (pJson == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dmDecodeFile(pJson, pDeployed) < 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
*pDeployed = deployed->valueint != 0;
|
|
||||||
|
|
||||||
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
dInfo("succceed to read mnode file %s", file);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (root != NULL) cJSON_Delete(root);
|
if (content != NULL) taosMemoryFree(content);
|
||||||
|
if (pJson != NULL) cJSON_Delete(pJson);
|
||||||
if (pFile != NULL) taosCloseFile(&pFile);
|
if (pFile != NULL) taosCloseFile(&pFile);
|
||||||
|
|
||||||
terrno = code;
|
if (code != 0) {
|
||||||
|
dError("failed to read dnode file:%s since %s", file, terrstr());
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2339,32 +2339,33 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
|
TSDBROW *pRow = NULL, *piRow = NULL;
|
||||||
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
||||||
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
|
if (pBlockScanInfo->iter.hasVal) {
|
||||||
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||||
} else {
|
|
||||||
TSDBROW *pRow = NULL, *piRow = NULL;
|
|
||||||
if (pBlockScanInfo->iter.hasVal) {
|
|
||||||
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pBlockScanInfo->iiter.hasVal) {
|
|
||||||
piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
|
||||||
}
|
|
||||||
|
|
||||||
// imem + file + last block
|
|
||||||
if (pBlockScanInfo->iiter.hasVal) {
|
|
||||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
|
|
||||||
}
|
|
||||||
|
|
||||||
// mem + file + last block
|
|
||||||
if (pBlockScanInfo->iter.hasVal) {
|
|
||||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
|
|
||||||
}
|
|
||||||
|
|
||||||
// files data blocks + last block
|
|
||||||
return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pBlockScanInfo->iiter.hasVal) {
|
||||||
|
piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// two levels of mem-table does contain the valid rows
|
||||||
|
if (pRow != NULL && piRow != NULL) {
|
||||||
|
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// imem + file + last block
|
||||||
|
if (pBlockScanInfo->iiter.hasVal) {
|
||||||
|
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// mem + file + last block
|
||||||
|
if (pBlockScanInfo->iter.hasVal) {
|
||||||
|
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// files data blocks + last block
|
||||||
|
return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
||||||
|
|
|
@ -682,6 +682,18 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
|
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
uint8_t *pVal = pColVal->value.pData;
|
||||||
|
|
||||||
|
pColVal->value.pData = NULL;
|
||||||
|
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
if (pColVal->value.nData) {
|
||||||
|
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -720,12 +732,35 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
|
|
||||||
if (key.version > pMerger->version) {
|
if (key.version > pMerger->version) {
|
||||||
if (!COL_VAL_IS_NONE(pColVal)) {
|
if (!COL_VAL_IS_NONE(pColVal)) {
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
tColVal->value.nData = pColVal->value.nData;
|
||||||
|
if (pColVal->value.nData) {
|
||||||
|
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
tColVal->flag = 0;
|
||||||
|
} else {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (key.version < pMerger->version) {
|
} else if (key.version < pMerger->version) {
|
||||||
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
||||||
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
tColVal->value.nData = pColVal->value.nData;
|
||||||
|
if (pColVal->value.nData) {
|
||||||
|
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
tColVal->flag = 0;
|
||||||
|
} else {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0 && "dup versions not allowed");
|
ASSERT(0 && "dup versions not allowed");
|
||||||
|
@ -765,6 +800,18 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
// other
|
// other
|
||||||
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
|
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
|
||||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||||
|
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
uint8_t *pVal = pColVal->value.pData;
|
||||||
|
|
||||||
|
pColVal->value.pData = NULL;
|
||||||
|
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
if (pColVal->value.nData) {
|
||||||
|
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -775,7 +822,16 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); }
|
void tRowMergerClear(SRowMerger *pMerger) {
|
||||||
|
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
||||||
|
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
|
||||||
|
tFree(pTColVal->value.pData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pMerger->pArray);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -789,12 +845,47 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
||||||
|
|
||||||
if (key.version > pMerger->version) {
|
if (key.version > pMerger->version) {
|
||||||
if (!COL_VAL_IS_NONE(pColVal)) {
|
if (!COL_VAL_IS_NONE(pColVal)) {
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
if (!COL_VAL_IS_NULL(pColVal)) {
|
||||||
|
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
pTColVal->value.nData = pColVal->value.nData;
|
||||||
|
if (pTColVal->value.nData) {
|
||||||
|
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
|
||||||
|
}
|
||||||
|
pTColVal->flag = 0;
|
||||||
|
} else {
|
||||||
|
tFree(pTColVal->value.pData);
|
||||||
|
pTColVal->value.pData = NULL;
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (key.version < pMerger->version) {
|
} else if (key.version < pMerger->version) {
|
||||||
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
||||||
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
if (!COL_VAL_IS_NULL(pColVal)) {
|
||||||
|
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
tColVal->value.nData = pColVal->value.nData;
|
||||||
|
if (tColVal->value.nData) {
|
||||||
|
memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData);
|
||||||
|
}
|
||||||
|
tColVal->flag = 0;
|
||||||
|
} else {
|
||||||
|
tFree(tColVal->value.pData);
|
||||||
|
tColVal->value.pData = NULL;
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -279,7 +279,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
// for stream interval
|
// for stream interval
|
||||||
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_DELETE_DATA) {
|
pBlock->info.type == STREAM_DELETE_DATA) {
|
||||||
// printDataBlock1(pBlock, "project1");
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
|
||||||
if (NULL == *pPage) {
|
if (NULL == *pPage) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (SResultRow*)((char*)(*pPage) + p1->offset);
|
return (SResultRow*)((char*)(*pPage) + p1->offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1729,6 +1729,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
/*resetTableScanInfo(pTSInfo, pWin);*/
|
/*resetTableScanInfo(pTSInfo, pWin);*/
|
||||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||||
pTSInfo->base.dataReader = NULL;
|
pTSInfo->base.dataReader = NULL;
|
||||||
|
pInfo->pTableScanOp->status = OP_OPENED;
|
||||||
|
|
||||||
pTSInfo->scanTimes = 0;
|
pTSInfo->scanTimes = 0;
|
||||||
pTSInfo->currentGroupId = -1;
|
pTSInfo->currentGroupId = -1;
|
||||||
|
|
|
@ -1918,6 +1918,13 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
colDataAppend(pColInfo, 0, p, false);
|
colDataAppend(pColInfo, 0, p, false);
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
|
|
||||||
|
// make the valgrind happy that all memory buffer has been initialized already.
|
||||||
|
if (slotId != 0) {
|
||||||
|
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
|
int64_t v = 0;
|
||||||
|
colDataAppendInt64(p1, 0, &v);
|
||||||
|
}
|
||||||
|
|
||||||
pBlock->info.rows = 1;
|
pBlock->info.rows = 1;
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
|
|
@ -1688,7 +1688,9 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SVariant* pVal = &pCtx->param[1].param;
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
|
int32_t code = 0;
|
||||||
double v = 0;
|
double v = 0;
|
||||||
|
|
||||||
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
|
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
@ -1696,14 +1698,14 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||||
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
||||||
int32_t code = getPercentile(pMemBucket, v, &ppInfo->result);
|
code = getPercentile(pMemBucket, v, &ppInfo->result);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
tMemBucketDestroy(pMemBucket);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tMemBucketDestroy(pMemBucket);
|
tMemBucketDestroy(pMemBucket);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2670,7 +2672,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
int32_t v = *(int32_t*)pv;
|
int32_t v = *(int32_t*)pv;
|
||||||
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
||||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
} else {
|
} else {
|
||||||
colDataAppendInt64(pOutput, pos, &delta);
|
colDataAppendInt64(pOutput, pos, &delta);
|
||||||
}
|
}
|
||||||
|
@ -2683,7 +2685,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
int8_t v = *(int8_t*)pv;
|
int8_t v = *(int8_t*)pv;
|
||||||
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
||||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
} else {
|
} else {
|
||||||
colDataAppendInt64(pOutput, pos, &delta);
|
colDataAppendInt64(pOutput, pos, &delta);
|
||||||
}
|
}
|
||||||
|
@ -2694,7 +2696,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
int16_t v = *(int16_t*)pv;
|
int16_t v = *(int16_t*)pv;
|
||||||
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
||||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
} else {
|
} else {
|
||||||
colDataAppendInt64(pOutput, pos, &delta);
|
colDataAppendInt64(pOutput, pos, &delta);
|
||||||
}
|
}
|
||||||
|
@ -2706,7 +2708,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
int64_t v = *(int64_t*)pv;
|
int64_t v = *(int64_t*)pv;
|
||||||
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null
|
||||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
} else {
|
} else {
|
||||||
colDataAppendInt64(pOutput, pos, &delta);
|
colDataAppendInt64(pOutput, pos, &delta);
|
||||||
}
|
}
|
||||||
|
@ -2717,7 +2719,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
float v = *(float*)pv;
|
float v = *(float*)pv;
|
||||||
double delta = factor * (v - pDiffInfo->prev.d64); // direct previous may be null
|
double delta = factor * (v - pDiffInfo->prev.d64); // direct previous may be null
|
||||||
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
|
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
} else {
|
} else {
|
||||||
colDataAppendDouble(pOutput, pos, &delta);
|
colDataAppendDouble(pOutput, pos, &delta);
|
||||||
}
|
}
|
||||||
|
@ -2728,7 +2730,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
double v = *(double*)pv;
|
double v = *(double*)pv;
|
||||||
double delta = factor * (v - pDiffInfo->prev.d64); // direct previous may be null
|
double delta = factor * (v - pDiffInfo->prev.d64); // direct previous may be null
|
||||||
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
|
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
} else {
|
} else {
|
||||||
colDataAppendDouble(pOutput, pos, &delta);
|
colDataAppendDouble(pOutput, pos, &delta);
|
||||||
}
|
}
|
||||||
|
@ -2763,7 +2765,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
if (pDiffInfo->includeNull) {
|
if (pDiffInfo->includeNull) {
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
}
|
}
|
||||||
|
@ -2801,8 +2803,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
if (pDiffInfo->includeNull) {
|
if (pDiffInfo->includeNull) {
|
||||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
colDataSetNull_f_s(pOutput, pos);
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -92,6 +92,7 @@ static void resetPosInfo(SSlotInfo *pInfo) {
|
||||||
|
|
||||||
int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
|
int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
|
||||||
ASSERT(pMemBucket->total == 1);
|
ASSERT(pMemBucket->total == 1);
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
|
for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
|
||||||
tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
|
tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
|
||||||
|
|
|
@ -3181,6 +3181,7 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData *pRe
|
||||||
void *colData = colDataGetData(pData, i);
|
void *colData = colDataGetData(pData, i);
|
||||||
if (colData == NULL || colDataIsNull_s(pData, i)) {
|
if (colData == NULL || colDataIsNull_s(pData, i)) {
|
||||||
all = false;
|
all = false;
|
||||||
|
p[i] = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -635,6 +635,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
||||||
}
|
}
|
||||||
|
|
||||||
void walFsync(SWal *pWal, bool forceFsync) {
|
void walFsync(SWal *pWal, bool forceFsync) {
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
||||||
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||||
if (taosFsyncFile(pWal->pIdxFile) < 0) {
|
if (taosFsyncFile(pWal->pIdxFile) < 0) {
|
||||||
|
@ -647,4 +648,5 @@ void walFsync(SWal *pWal, bool forceFsync) {
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1039,6 +1039,11 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 4
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -R
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 2
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
|
||||||
|
|
|
@ -88,6 +88,7 @@ sql insert into car1 values (now, 1, 1,1 ) (now +1s, 2,2,2) car2 values (now, 1,
|
||||||
|
|
||||||
sql select c1+speed from stb where c1 > 0
|
sql select c1+speed from stb where c1 > 0
|
||||||
if $rows != 3 then
|
if $rows != 3 then
|
||||||
|
print $rows , expect 3
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -600,6 +600,11 @@ class TDTestCase:
|
||||||
tdLog.printNoPrefix("==========step4:after wal, all check again ")
|
tdLog.printNoPrefix("==========step4:after wal, all check again ")
|
||||||
self.all_test()
|
self.all_test()
|
||||||
|
|
||||||
|
# add for TS-2440
|
||||||
|
for i in range(self.rows):
|
||||||
|
tdSql.execute("drop database if exists db3 ")
|
||||||
|
tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m")
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
|
@ -851,6 +851,7 @@ class TDTestCase:
|
||||||
tdLog.info("========mark==%s==="% mark);
|
tdLog.info("========mark==%s==="% mark);
|
||||||
try:
|
try:
|
||||||
tdSql.query(sql,queryTimes=1)
|
tdSql.query(sql,queryTimes=1)
|
||||||
|
self.explain_sql(sql)
|
||||||
except:
|
except:
|
||||||
tdLog.info("sql is not support :=====%s; " %sql)
|
tdLog.info("sql is not support :=====%s; " %sql)
|
||||||
tdSql.error(sql)
|
tdSql.error(sql)
|
||||||
|
@ -4995,9 +4996,7 @@ class TDTestCase:
|
||||||
sql += "%s ;" % random.choice(self.limit_u_where)
|
sql += "%s ;" % random.choice(self.limit_u_where)
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdLog.info(len(sql))
|
tdLog.info(len(sql))
|
||||||
tdSql.query(sql)
|
self.data_check(sql,mark='15-2')
|
||||||
self.cur1.execute(sql)
|
|
||||||
self.explain_sql(sql)
|
|
||||||
|
|
||||||
tdSql.query("select 15-2.2 from stable_1;")
|
tdSql.query("select 15-2.2 from stable_1;")
|
||||||
for i in range(self.fornum):
|
for i in range(self.fornum):
|
||||||
|
@ -5013,9 +5012,7 @@ class TDTestCase:
|
||||||
sql += "%s ;" % random.choice(self.limit_u_where)
|
sql += "%s ;" % random.choice(self.limit_u_where)
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdLog.info(len(sql))
|
tdLog.info(len(sql))
|
||||||
tdSql.query(sql)
|
self.data_check(sql,mark='15-2.2')
|
||||||
self.cur1.execute(sql)
|
|
||||||
self.explain_sql(sql)
|
|
||||||
|
|
||||||
self.restartDnodes()
|
self.restartDnodes()
|
||||||
tdSql.query("select 15-3 from stable_1;")
|
tdSql.query("select 15-3 from stable_1;")
|
||||||
|
@ -5033,9 +5030,7 @@ class TDTestCase:
|
||||||
sql += "%s " % random.choice(self.limit_where)
|
sql += "%s " % random.choice(self.limit_where)
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdLog.info(len(sql))
|
tdLog.info(len(sql))
|
||||||
tdSql.query(sql)
|
self.data_check(sql,mark='15-3')
|
||||||
self.cur1.execute(sql)
|
|
||||||
self.explain_sql(sql)
|
|
||||||
|
|
||||||
tdSql.query("select 15-4 from stable_1;")
|
tdSql.query("select 15-4 from stable_1;")
|
||||||
for i in range(self.fornum):
|
for i in range(self.fornum):
|
||||||
|
@ -5052,9 +5047,7 @@ class TDTestCase:
|
||||||
sql += "%s " % random.choice(self.limit_u_where)
|
sql += "%s " % random.choice(self.limit_u_where)
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdLog.info(len(sql))
|
tdLog.info(len(sql))
|
||||||
tdSql.query(sql)
|
self.data_check(sql,mark='15-4')
|
||||||
self.cur1.execute(sql)
|
|
||||||
self.explain_sql(sql)
|
|
||||||
|
|
||||||
tdSql.query("select 15-4.2 from stable_1;")
|
tdSql.query("select 15-4.2 from stable_1;")
|
||||||
for i in range(self.fornum):
|
for i in range(self.fornum):
|
||||||
|
@ -5087,8 +5080,7 @@ class TDTestCase:
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdLog.info(len(sql))
|
tdLog.info(len(sql))
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
self.cur1.execute(sql)
|
self.data_check(sql,mark='15-5')
|
||||||
self.explain_sql(sql)
|
|
||||||
|
|
||||||
#16 select * from (select calc_aggregate_regulars as agg from regular_table where <\>\in\and\or order by limit offset )
|
#16 select * from (select calc_aggregate_regulars as agg from regular_table where <\>\in\and\or order by limit offset )
|
||||||
#self.dropandcreateDB_random("%s" %db, 1)
|
#self.dropandcreateDB_random("%s" %db, 1)
|
||||||
|
|
|
@ -0,0 +1,191 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.dnodes import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
def getBuildPath(self):
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
if ("community" in selfPath):
|
||||||
|
projPath = selfPath[:selfPath.find("community")]
|
||||||
|
else:
|
||||||
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(projPath):
|
||||||
|
if ("taosd" in files):
|
||||||
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
|
if ("packaging" not in rootRealPath):
|
||||||
|
buildPath = root[:len(root)-len("/build/bin")]
|
||||||
|
break
|
||||||
|
return buildPath
|
||||||
|
|
||||||
|
def run_benchmark(self,dbname,tables,per_table_num,order,replica):
|
||||||
|
#O :Out of order
|
||||||
|
#A :Repliaca
|
||||||
|
buildPath = self.getBuildPath()
|
||||||
|
if (buildPath == ""):
|
||||||
|
tdLog.exit("taosd not found!")
|
||||||
|
else:
|
||||||
|
tdLog.info("taosd found in %s" % buildPath)
|
||||||
|
binPath = buildPath+ "/build/bin/"
|
||||||
|
|
||||||
|
os.system("%staosBenchmark -d %s -t %d -n %d -O %d -a %d -b float,double,nchar\(200\),binary\(50\) -T 50 -y " % (binPath,dbname,tables,per_table_num,order,replica))
|
||||||
|
|
||||||
|
def sql_base(self,dbname):
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql1 = "select count(*) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql1)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(ts) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(_c0) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c0) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c1) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c2) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c3) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(t0) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(t1) from %s.meters" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(ts) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(_c0) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c0) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c1) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c2) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(c3) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(t0) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
self.check_sub(dbname)
|
||||||
|
sql2 = "select count(t1) from (select * from %s.meters)" %dbname
|
||||||
|
self.sql_base_check(sql1,sql2)
|
||||||
|
|
||||||
|
|
||||||
|
def sql_base_check(self,sql1,sql2):
|
||||||
|
tdSql.query(sql1)
|
||||||
|
sql1_result = tdSql.getData(0,0)
|
||||||
|
tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
|
||||||
|
|
||||||
|
tdSql.query(sql2)
|
||||||
|
sql2_result = tdSql.getData(0,0)
|
||||||
|
tdLog.info("sql:%s , result: %s" %(sql2,sql2_result))
|
||||||
|
|
||||||
|
if sql1_result==sql2_result:
|
||||||
|
tdLog.info(f"checkEqual success, sql1_result={sql1_result},sql2_result={sql2_result}")
|
||||||
|
else :
|
||||||
|
tdLog.exit(f"checkEqual error, sql1_result=={sql1_result},sql2_result={sql2_result}")
|
||||||
|
|
||||||
|
def run_sql(self,dbname):
|
||||||
|
self.sql_base(dbname)
|
||||||
|
|
||||||
|
tdSql.execute(" flush database %s;" %dbname)
|
||||||
|
|
||||||
|
self.sql_base(dbname)
|
||||||
|
|
||||||
|
def check_sub(self,dbname):
|
||||||
|
|
||||||
|
sql = "select count(*) from (select distinct(tbname) from %s.meters)" %dbname
|
||||||
|
tdSql.query(sql)
|
||||||
|
num = tdSql.getData(0,0)
|
||||||
|
|
||||||
|
for i in range(0,num):
|
||||||
|
sql1 = "select count(*) from %s.d%d" %(dbname,i)
|
||||||
|
tdSql.query(sql1)
|
||||||
|
sql1_result = tdSql.getData(0,0)
|
||||||
|
tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
|
||||||
|
|
||||||
|
def check_out_of_order(self,dbname,tables,per_table_num,order,replica):
|
||||||
|
self.run_benchmark(dbname,tables,per_table_num,order,replica)
|
||||||
|
print("sleep 10 seconds")
|
||||||
|
#time.sleep(10)
|
||||||
|
print("sleep 10 seconds finish")
|
||||||
|
|
||||||
|
self.run_sql(dbname)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
startTime = time.time()
|
||||||
|
|
||||||
|
#self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1)
|
||||||
|
self.check_out_of_order('db1',random.randint(50,200),random.randint(10000,20000),random.randint(1,5),1)
|
||||||
|
|
||||||
|
# self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1)
|
||||||
|
|
||||||
|
# self.check_out_of_order('db3',random.randint(50,200),random.randint(10000,50000),random.randint(50,100),1)
|
||||||
|
|
||||||
|
# self.check_out_of_order('db4',random.randint(50,200),random.randint(10000,50000),100,1)
|
||||||
|
|
||||||
|
endTime = time.time()
|
||||||
|
print("total time %ds" % (endTime - startTime))
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue