Merge branch 'develop' into feature/unified

This commit is contained in:
jtao1735 2020-05-06 01:14:11 +00:00
commit 62b135b50f
30 changed files with 303 additions and 134 deletions

View File

@ -280,6 +280,7 @@ typedef struct {
SResRec * pGroupRec; SResRec * pGroupRec;
char * data; char * data;
void ** tsrow; void ** tsrow;
int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex * pColumnIndex; SColumnIndex * pColumnIndex;
struct SLocalReducer *pLocalReducer; struct SLocalReducer *pLocalReducer;
@ -420,7 +421,7 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s
void tscQueueAsyncFreeResult(SSqlObj *pSql); void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column, int16_t bytes); void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
extern void * pDnodeConn; extern void * pDnodeConn;
extern void * tscCacheHandle; extern void * tscCacheHandle;

View File

@ -317,7 +317,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i); SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
if (pSup->pSqlExpr != NULL) { if (pSup->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, pSup->pSqlExpr->resBytes); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
} else { } else {
// todo add // todo add
} }

View File

@ -2965,14 +2965,28 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
assert(pCtx->inputBytes == pCtx->outputBytes); assert(pCtx->inputBytes == pCtx->outputBytes);
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType); char* output = pCtx->aOutputBuf;
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, output, pCtx->outputType);
pCtx->aOutputBuf += pCtx->outputBytes; pCtx->aOutputBuf += pCtx->outputBytes;
} }
} }
static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
INC_INIT_VAL(pCtx, 1); INC_INIT_VAL(pCtx, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
char* output = pCtx->aOutputBuf;
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
pCtx->aOutputBuf += pCtx->outputBytes; pCtx->aOutputBuf += pCtx->outputBytes;
} }
@ -3007,7 +3021,8 @@ static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) {
*(int16_t*) output = pCtx->tag.nLen; *(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE; output += VARSTR_HEADER_SIZE;
} }
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
} }
static void copy_function(SQLFunctionCtx *pCtx) { static void copy_function(SQLFunctionCtx *pCtx) {

View File

@ -310,7 +310,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
*payload = TSDB_DATA_BINARY_NULL; *payload = TSDB_DATA_BINARY_NULL;
} else { // too long values will return invalid sql, not be truncated automatically } else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n > pSchema->bytes) { if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z); return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
} }
@ -328,7 +328,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
} else { } else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t resLen = -1; int32_t resLen = -1;
if (!taosMbsToUcs4(pToken->z, pToken->n, payload + VARSTR_HEADER_SIZE, pSchema->bytes, &resLen)) { if (!taosMbsToUcs4(pToken->z, pToken->n, payload + VARSTR_HEADER_SIZE, pSchema->bytes - VARSTR_HEADER_SIZE, &resLen)) {
char buf[512] = {0}; char buf[512] = {0};
snprintf(buf, 512, "%s", strerror(errno)); snprintf(buf, 512, "%s", strerror(errno));

View File

@ -5132,7 +5132,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
int16_t colIndex = pColIndex->colIndex; int16_t colIndex = pColIndex->colIndex;
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN; bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; // todo extract method
name = TSQL_TBNAME_L; name = TSQL_TBNAME_L;
} else { } else {
if (TSDB_COL_IS_TAG(pColIndex->flag)) { if (TSDB_COL_IS_TAG(pColIndex->flag)) {

View File

@ -423,7 +423,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
assert(0); assert(0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, 0); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
} }
*rows = pRes->tsrow; *rows = pRes->tsrow;
@ -723,6 +723,15 @@ char *taos_get_server_info(TAOS *taos) {
return pObj->sversion; return pObj->sversion;
} }
int* taos_fetch_lengths(TAOS_RES *res) {
SSqlObj* pSql = (SSqlObj* ) res;
if (pSql == NULL || pSql->signature != pSql) {
return NULL;
}
return pSql->res.length;
}
char *taos_get_client_info() { return version; } char *taos_get_client_info() { return version; }
void taos_stop_query(TAOS_RES *res) { void taos_stop_query(TAOS_RES *res) {
@ -794,7 +803,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
size_t xlen = 0; size_t xlen = 0;
for (xlen = 0; xlen <= fields[i].bytes; xlen++) { for (xlen = 0; xlen < fields[i].bytes - VARSTR_HEADER_SIZE; xlen++) {
char c = ((char *)row[i])[xlen]; char c = ((char *)row[i])[xlen];
if (c == 0) break; if (c == 0) break;
str[len++] = c; str[len++] = c;

View File

@ -1849,6 +1849,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->tsrow == NULL) { if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES); pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
} }
bool success = false; bool success = false;
@ -1967,7 +1968,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
SFieldSupInfo* pSup = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i); SFieldSupInfo* pSup = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i);
if (pSup->pSqlExpr != NULL) { if (pSup->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, pSup->pSqlExpr->resBytes); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
} }
// primary key column cannot be null in interval query, no need to check // primary key column cannot be null in interval query, no need to check

View File

@ -210,7 +210,7 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI
return false; return false;
} }
// order by column exists, not a non-ordered projection query // order by columnIndex exists, not a non-ordered projection query
return pQueryInfo->order.orderColId < 0; return pQueryInfo->order.orderColId < 0;
} }
@ -219,7 +219,7 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde
return false; return false;
} }
// order by column exists, a non-ordered projection query // order by columnIndex exists, a non-ordered projection query
return pQueryInfo->order.orderColId >= 0; return pQueryInfo->order.orderColId >= 0;
} }
@ -286,13 +286,15 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
pRes->numOfCols = numOfOutput; pRes->numOfCols = numOfOutput;
pRes->tsrow = calloc(POINTER_BYTES, numOfOutput); pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
pRes->buffer = calloc(POINTER_BYTES, numOfOutput); pRes->length = calloc(numOfOutput, sizeof(int32_t)); // todo refactor
pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
// not enough memory // not enough memory
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
tfree(pRes->tsrow); tfree(pRes->tsrow);
tfree(pRes->buffer); tfree(pRes->buffer);
tfree(pRes->length);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code; return pRes->code;
@ -312,6 +314,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->pRsp); tfree(pRes->pRsp);
tfree(pRes->tsrow); tfree(pRes->tsrow);
tfree(pRes->length);
tfree(pRes->pGroupRec); tfree(pRes->pGroupRec);
tfree(pRes->pColumnIndex); tfree(pRes->pColumnIndex);
@ -592,7 +595,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
} }
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
// TODO: optimize this function // TODO: optimize this function, handle the case while binary is not presented
int len = 0; int len = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
@ -927,7 +930,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr)); SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
pExpr->functionId = functionId; pExpr->functionId = functionId;
// set the correct column index // set the correct columnIndex index
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX; pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
} else { } else {
@ -1066,7 +1069,7 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
} }
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
// ignore the tbname column to be inserted into source list // ignore the tbname columnIndex to be inserted into source list
if (pColIndex->columnIndex < 0) { if (pColIndex->columnIndex < 0) {
return NULL; return NULL;
} }
@ -2127,22 +2130,30 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
} }
} }
char* tscGetResultColumnChr(SSqlRes* pRes, SQueryInfo* pQueryInfo, int32_t column, int16_t bytes) { void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, columnIndex);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, column); assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType; int32_t type = pInfo->pSqlExpr->resType;
int32_t bytes = pInfo->pSqlExpr->resBytes;
char* pData = ((char*) pRes->data) + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; char* pData = ((char*) pRes->data) + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
int32_t realLen = varDataLen(pData); int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE);
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
*(char*) (pData + realLen + sizeof(int16_t)) = 0; *(char*) (pData + realLen + VARSTR_HEADER_SIZE) = 0;
} }
return pData + VARSTR_HEADER_SIZE; // head is the length of binary/nchar data pRes->tsrow[columnIndex] = pData + VARSTR_HEADER_SIZE;
pRes->length[columnIndex] = realLen;
} else { } else {
return pData; assert(bytes == tDataTypeDesc[type].nSize);
pRes->tsrow[columnIndex] = pData;
pRes->length[columnIndex] = bytes;
} }
} }

View File

@ -32,7 +32,7 @@ extern "C" {
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\ #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\
char* _e = stpncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_maxs));\ char* _e = stpncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_maxs));\
*(VarDataLenT*)(x) = _e - (x);\ *(VarDataLenT*)(x) = (_e - (x) - VARSTR_HEADER_SIZE);\
} while(0) } while(0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\ #define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\

View File

@ -424,8 +424,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints, dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints,
target->maxPoints); target->maxPoints);
} }
target->numOfPoints++;
} }
target->numOfPoints++;
} else { } else {
pTarget = tdDupDataCols(target, true); pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err; if (pTarget == NULL) goto _err;

View File

@ -228,7 +228,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset); int32_t num = taosGetQueueNumber(pWorker->qset);
if (num > 0) { if (num > 0) {
usleep(30); usleep(30000);
sched_yield(); sched_yield();
} else { } else {
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);

View File

@ -53,9 +53,9 @@ typedef enum {
} TSDB_OPTION; } TSDB_OPTION;
typedef struct taosField { typedef struct taosField {
char name[64]; char name[64];
short bytes; short bytes;
char type; uint8_t type;
} TAOS_FIELD; } TAOS_FIELD;
#ifdef _TD_GO_DLL_ #ifdef _TD_GO_DLL_
@ -104,6 +104,8 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
int taos_validate_sql(TAOS *taos, const char *sql); int taos_validate_sql(TAOS *taos, const char *sql);
int* taos_fetch_lengths(TAOS_RES *res);
// TAOS_RES *taos_list_tables(TAOS *mysql, const char *wild); // TAOS_RES *taos_list_tables(TAOS *mysql, const char *wild);
// TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild); // TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild);

View File

@ -36,14 +36,17 @@ extern "C" {
typedef int32_t VarDataOffsetT; typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT; typedef int16_t VarDataLenT;
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0] #define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) #define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT))) #define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT)))
#define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v)) #define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char*)(v)) - VARSTR_HEADER_SIZE))
// this data type is internally used only in 'in' query to hold the values // this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
// Bytes for each type. // Bytes for each type.
extern const int32_t TYPE_BYTES[11]; extern const int32_t TYPE_BYTES[11];

View File

@ -350,6 +350,8 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
row = taos_fetch_row(result); row = taos_fetch_row(result);
int32_t* length = taos_fetch_lengths(result);
char t_str[TSDB_MAX_BYTES_PER_ROW] = "\0"; char t_str[TSDB_MAX_BYTES_PER_ROW] = "\0";
int l[TSDB_MAX_COLUMNS] = {0}; int l[TSDB_MAX_COLUMNS] = {0};
int maxLenColumnName = 0; int maxLenColumnName = 0;
@ -457,7 +459,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW); memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW);
memcpy(t_str, row[i], fields[i].bytes); memcpy(t_str, row[i], length[i]);
/* printf("%-*s|",max(fields[i].bytes, strlen(fields[i].name)), /* printf("%-*s|",max(fields[i].bytes, strlen(fields[i].name)),
* t_str); */ * t_str); */
/* printf("%-*s|", l[i], t_str); */ /* printf("%-*s|", l[i], t_str); */
@ -532,7 +534,8 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW); memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW);
memcpy(t_str, row[i], fields[i].bytes); memcpy(t_str, row[i], length[i]);
l[i] = MAX(fields[i].bytes, strlen(fields[i].name)); l[i] = MAX(fields[i].bytes, strlen(fields[i].name));
shellPrintNChar(t_str, l[i], printMode); shellPrintNChar(t_str, l[i], printMode);
break; break;
@ -610,7 +613,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW); memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW);
memcpy(t_str, row[i], fields[i].bytes); memcpy(t_str, row[i], length[i]);
fprintf(fp, "\'%s\'", t_str); fprintf(fp, "\'%s\'", t_str);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:

View File

@ -480,7 +480,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
} }
#endif #endif
pShow->bytes[cols] = 24; pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "keep1,keep2,keep(D)"); strcpy(pSchema[cols].name, "keep1,keep2,keep(D)");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -540,13 +540,13 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
} }
#endif #endif
pShow->bytes[cols] = 3; pShow->bytes[cols] = 3 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "precision"); strcpy(pSchema[cols].name, "precision");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 10; pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status"); strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -672,7 +672,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char *prec = (pDb->cfg.precision == TSDB_TIME_PRECISION_MILLI) ? TSDB_TIME_PRECISION_MILLI_STR char *prec = (pDb->cfg.precision == TSDB_TIME_PRECISION_MILLI) ? TSDB_TIME_PRECISION_MILLI_STR
: TSDB_TIME_PRECISION_MICRO_STR; : TSDB_TIME_PRECISION_MICRO_STR;
strcpy(pWrite, prec); STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

View File

@ -489,7 +489,7 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 40; pShow->bytes[cols] = 40 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "end_point"); strcpy(pSchema[cols].name, "end_point");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -507,7 +507,7 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 12; pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status"); strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -607,19 +607,19 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 40; pShow->bytes[cols] = 40 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "end point"); strcpy(pSchema[cols].name, "end point");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "module"); strcpy(pSchema[cols].name, "module");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status"); strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -711,13 +711,13 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = TSDB_CFG_OPTION_LEN; pShow->bytes[cols] = TSDB_CFG_OPTION_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "config name"); strcpy(pSchema[cols].name, "config name");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = TSDB_CFG_VALUE_LEN; pShow->bytes[cols] = TSDB_CFG_VALUE_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "config value"); strcpy(pSchema[cols].name, "config value");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -804,7 +804,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 12; pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status"); strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);

View File

@ -295,7 +295,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 12; pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role"); strcpy(pSchema[cols].name, "role");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);

View File

@ -143,7 +143,9 @@ static void *sdbGetTableFromId(int32_t tableId) {
static int32_t sdbInitWal() { static int32_t sdbInitWal() {
SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1}; SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1};
tsSdbObj.wal = walOpen(tsMnodeDir, &walCfg); char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/wal", tsMnodeDir);
tsSdbObj.wal = walOpen(temp, &walCfg);
if (tsSdbObj.wal == NULL) { if (tsSdbObj.wal == NULL) {
sdbError("failed to open sdb wal in %s", tsMnodeDir); sdbError("failed to open sdb wal in %s", tsMnodeDir);
return -1; return -1;
@ -196,8 +198,7 @@ static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32
} }
static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) { static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) {
strcpy(name, "wal0"); return walGetWalFile(tsSdbObj.wal, name, index);
return 0;
} }
static void sdbNotifyRole(void *ahandle, int8_t role) { static void sdbNotifyRole(void *ahandle, int8_t role) {
@ -281,7 +282,7 @@ void sdbUpdateSync() {
syncInfo.vgId = 1; syncInfo.vgId = 1;
syncInfo.version = sdbGetVersion(); syncInfo.version = sdbGetVersion();
syncInfo.syncCfg = syncCfg; syncInfo.syncCfg = syncCfg;
sprintf(syncInfo.path, "%s/", tsMnodeDir); sprintf(syncInfo.path, "%s", tsMnodeDir);
syncInfo.ahandle = NULL; syncInfo.ahandle = NULL;
syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.getFileInfo = sdbGetFileInfo;
@ -458,6 +459,10 @@ static int sdbWrite(void *param, void *data, int type) {
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
if (pHead->version <= tsSdbObj.version) { if (pHead->version <= tsSdbObj.version) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) {
sdbTrace("forward request is received, version:%" PRIu64 " confirm it", pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbObj.version + 1) { } else if (pHead->version != tsSdbObj.version + 1) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);

View File

@ -20,6 +20,7 @@
#include "tutil.h" #include "tutil.h"
#include "tglobal.h" #include "tglobal.h"
#include "tgrant.h" #include "tgrant.h"
#include "tdataformat.h"
#include "dnode.h" #include "dnode.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
@ -256,13 +257,13 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = TSDB_USER_LEN; pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name"); strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 6; pShow->bytes[cols] = 8 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "privilege"); strcpy(pSchema[cols].name, "privilege");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -270,7 +271,7 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create time"); strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
@ -303,16 +304,16 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
cols = 0; cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pUser->user); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->user, TSDB_USER_LEN);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superAuth) { if (pUser->superAuth) {
strcpy(pWrite, "super"); STR_WITH_SIZE_TO_VARSTR(pWrite, "super", 5);
} else if (pUser->writeAuth) { } else if (pUser->writeAuth) {
strcpy(pWrite, "write"); STR_WITH_SIZE_TO_VARSTR(pWrite, "writable", 8);
} else { } else {
strcpy(pWrite, "read"); STR_WITH_SIZE_TO_VARSTR(pWrite, "readable", 8);
} }
cols++; cols++;

View File

@ -24,6 +24,7 @@
#include "tbalance.h" #include "tbalance.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnode.h" #include "dnode.h"
#include "tdataformat.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
#include "mgmtDb.h" #include "mgmtDb.h"
@ -373,9 +374,9 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 9; pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vgroup status"); strcpy(pSchema[cols].name, "vgroup_status");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
@ -408,13 +409,13 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 40; pShow->bytes[cols] = 40 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "end point"); strcpy(pSchema[cols].name, "end_point");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 9; pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vstatus"); strcpy(pSchema[cols].name, "vstatus");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
@ -474,7 +475,8 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pVgroup->status ? "updating" : "ready"); char* status = pVgroup->status? "updating" : "ready";
STR_TO_VARSTR(pWrite, status);
cols++; cols++;
for (int32_t i = 0; i < maxReplica; ++i) { for (int32_t i = 0; i < maxReplica; ++i) {
@ -486,18 +488,20 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
if (pDnode != NULL) { if (pDnode != NULL) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strncpy(pWrite, pDnode->dnodeEp, pShow->bytes[cols]-1); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mgmtGetMnodeRoleStr(pVgroup->vnodeGid[i].role)); status = mgmtGetMnodeRoleStr(pVgroup->vnodeGid[i].role);
STR_TO_VARSTR(pWrite, status);
cols++; cols++;
} else { } else {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, "null"); STR_WITH_SIZE_TO_VARSTR(pWrite, "NULL", 4);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, "null"); STR_WITH_SIZE_TO_VARSTR(pWrite, "NULL", 4);
cols++; cols++;
} }
} }

View File

@ -1427,7 +1427,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
int32_t index = pSqlFuncMsg->colInfo.colIndex; int32_t index = pSqlFuncMsg->colInfo.colIndex;
if (TSDB_COL_IS_TAG(pIndex->flag)) { if (TSDB_COL_IS_TAG(pIndex->flag)) {
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) {
pCtx->inputBytes = TSDB_TABLE_NAME_LEN; pCtx->inputBytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pCtx->inputType = TSDB_DATA_TYPE_BINARY; pCtx->inputType = TSDB_DATA_TYPE_BINARY;
} else { } else {
pCtx->inputBytes = pQuery->tagColList[index].bytes; pCtx->inputBytes = pQuery->tagColList[index].bytes;
@ -5528,7 +5528,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
bytes = tDataTypeDesc[type].nSize; bytes = tDataTypeDesc[type].nSize;
} else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN; bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
} else{ } else{
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags); assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags);

View File

@ -32,8 +32,6 @@
#include "rpcCache.h" #include "rpcCache.h"
#include "rpcTcp.h" #include "rpcTcp.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "shash.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
@ -262,9 +260,7 @@ void *rpcOpen(const SRpcInit *pInit) {
} }
if (pRpc->connType == TAOS_CONN_SERVER) { if (pRpc->connType == TAOS_CONN_SERVER) {
pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
// pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
if (pRpc->hash == NULL) { if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label); tError("%s failed to init string hash", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
@ -298,8 +294,7 @@ void rpcClose(void *param) {
(*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
// taosHashCleanup(pRpc->hash); taosHashCleanup(pRpc->hash);
taosCleanUpStrHash(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl); taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool); taosIdPoolCleanUp(pRpc->idPool);
rpcCloseConnCache(pRpc->pCache); rpcCloseConnCache(pRpc->pCache);
@ -548,9 +543,8 @@ static void rpcCloseConn(void *thandle) {
if ( pRpc->connType == TAOS_CONN_SERVER) { if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
/*size_t size = */sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
// taosHashRemove(pRpc->hash, hashstr, size); taosHashRemove(pRpc->hash, hashstr, size);
taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
@ -599,12 +593,10 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
char hashstr[40] = {0}; char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
/*size_t size = */sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
// SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
if (ppConn) pConn = *ppConn; if (ppConn) pConn = *ppConn;
if (pConn) return pConn; if (pConn) return pConn;
@ -638,10 +630,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->localPort = (pRpc->localPort + pRpc->index); pConn->localPort = (pRpc->localPort + pRpc->index);
} }
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
// taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
pRpc->label, pConn, sid, pConn->user, pConn->localPort); pRpc->label, pConn, sid, pConn->user, pConn->localPort);
} }
@ -803,6 +792,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
sid = pConn->sid; sid = pConn->sid;
pConn->chandle = pRecv->chandle; pConn->chandle = pRecv->chandle;
pConn->peerIp = pRecv->ip;
if (pConn->peerPort == 0) pConn->peerPort = pRecv->port; if (pConn->peerPort == 0) pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = htons(pHead->port); if (pHead->port) pConn->peerPort = htons(pHead->port);

124
src/util/inc/tcoding.h Normal file
View File

@ -0,0 +1,124 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_CODING_H_
#define _TD_CODING_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <string.h>
#include "tutil.h"
const int TNUMBER = 1;
#define IS_LITTLE_ENDIAN() (*(char *)(&TNUMBER) != 0)
static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
} else {
((char *)buf)[0] = value & 0xff;
((char *)buf)[1] = (value >> 8) & 0xff;
}
return POINTER_DRIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosEncodeFixed32(void *buf, uint32_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
} else {
((char *)buf)[0] = value & 0xff;
((char *)buf)[1] = (value >> 8) & 0xff;
((char *)buf)[2] = (value >> 16) & 0xff;
((char *)buf)[3] = (value >> 24) & 0xff;
}
return POINTER_DRIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
} else {
((char *)buf)[0] = value & 0xff;
((char *)buf)[1] = (value >> 8) & 0xff;
((char *)buf)[2] = (value >> 16) & 0xff;
((char *)buf)[3] = (value >> 24) & 0xff;
((char *)buf)[4] = (value >> 32) & 0xff;
((char *)buf)[5] = (value >> 40) & 0xff;
((char *)buf)[6] = (value >> 48) & 0xff;
((char *)buf)[7] = (value >> 56) & 0xff;
}
return POINTER_DRIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosDecodeFixed16(void *buf, uint16_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
((char *)value)[1] = ((char *)buf)[0];
((char *)value)[0] = ((char *)buf)[1];
}
return POINTER_DRIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosDecodeFixed32(void *buf, uint32_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
((char *)value)[3] = ((char *)buf)[0];
((char *)value)[2] = ((char *)buf)[1];
((char *)value)[1] = ((char *)buf)[2];
((char *)value)[0] = ((char *)buf)[3];
}
return POINTER_DRIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
((char *)value)[7] = ((char *)buf)[0];
((char *)value)[6] = ((char *)buf)[1];
((char *)value)[5] = ((char *)buf)[2];
((char *)value)[4] = ((char *)buf)[3];
((char *)value)[3] = ((char *)buf)[4];
((char *)value)[2] = ((char *)buf)[5];
((char *)value)[1] = ((char *)buf)[6];
((char *)value)[0] = ((char *)buf)[7];
}
return POINTER_DRIFT(buf, sizeof(*value));
}
// TODO
static FORCE_INLINE void *taosEncodeVariant16(void *buf, uint16_t value) {}
static FORCE_INLINE void *taosEncodeVariant32(void *buf, uint32_t value) {}
static FORCE_INLINE void *taosEncodeVariant64(void *buf, uint64_t value) {}
static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) {}
static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) {}
static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) {}
#ifdef __cplusplus
}
#endif
#endif

View File

@ -27,8 +27,8 @@ class TDTestCase:
tdSql.query('select speed from tb order by ts desc') tdSql.query('select speed from tb order by ts desc')
tdLog.info('tdSql.checkRow(1)') tdLog.info('tdSql.checkRow(1)')
tdSql.checkRows(1) tdSql.checkRows(1)
tdLog.info('tdSql.checkData(0, 0, 1234)') tdLog.info("tdSql.checkData(0, 0, '1234')")
tdSql.checkData(0, 0, 1234) tdSql.checkData(0, 0, '1234')
tdLog.info('=============== step3') tdLog.info('=============== step3')
tdLog.info("insert into tb values (now+2a, '23456')") tdLog.info("insert into tb values (now+2a, '23456')")
tdSql.execute("insert into tb values (now+2a, '23456')") tdSql.execute("insert into tb values (now+2a, '23456')")
@ -37,8 +37,8 @@ class TDTestCase:
tdLog.info('tdSql.checkRow(2)') tdLog.info('tdSql.checkRow(2)')
tdSql.checkRows(2) tdSql.checkRows(2)
tdLog.info('==> $data00') tdLog.info('==> $data00')
tdLog.info('tdSql.checkData(0, 0, 23456)') tdLog.info("tdSql.checkData(0, 0, '23456')")
tdSql.checkData(0, 0, 23456) tdSql.checkData(0, 0, '23456')
tdLog.info('=============== step4') tdLog.info('=============== step4')
tdLog.info("insert into tb values (now+3a, '345678')") tdLog.info("insert into tb values (now+3a, '345678')")
tdSql.error("insert into tb values (now+3a, '345678')") tdSql.error("insert into tb values (now+3a, '345678')")
@ -49,8 +49,8 @@ class TDTestCase:
tdLog.info('tdSql.checkRow(3)') tdLog.info('tdSql.checkRow(3)')
tdSql.checkRows(3) tdSql.checkRows(3)
tdLog.info('==> $data00') tdLog.info('==> $data00')
tdLog.info('tdSql.checkData(0, 0, 34567)') tdLog.info("tdSql.checkData(0, 0, '34567')")
tdSql.checkData(0, 0, 34567) tdSql.checkData(0, 0, '34567')
tdLog.info('drop database db') tdLog.info('drop database db')
tdSql.execute('drop database db') tdSql.execute('drop database db')
tdLog.info('show databases') tdLog.info('show databases')

View File

@ -93,6 +93,9 @@ class TDSql:
if data is None: if data is None:
tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%s" % tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data)) (self.sql, row, col, self.queryResult[row][col], data))
elif isinstance(data, str):
tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
elif isinstance(data, datetime.date): elif isinstance(data, datetime.date):
tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%s" % tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data)) (self.sql, row, col, self.queryResult[row][col], data))

View File

@ -49,19 +49,22 @@ sleep 2000
sql alter user read privilege read sql alter user read privilege read
sql show users sql show users
if $data1_read != read then print $data1_read
if $data1_read != readable then
return -1 return -1
endi endi
sql_error alter user read privilege super sql_error alter user read privilege super
sql show users sql show users
if $data1_read != read then print $data1_read
if $data1_read != readable then
return -1 return -1
endi endi
sql alter user read privilege write sql alter user read privilege write
sql show users sql show users
if $data1_read != write then print $data1_read
if $data1_read != writable then
return -1 return -1
endi endi

View File

@ -49,19 +49,21 @@ sleep 2000
sql alter user read privilege read sql alter user read privilege read
sql show users sql show users
if $data1_read != read then print $data1_read
if $data1_read != readable then
return -1 return -1
endi endi
sql_error alter user read privilege super sql_error alter user read privilege super
sql show users sql show users
if $data1_read != read then print $data1_read
if $data1_read != readable then
return -1 return -1
endi endi
sql alter user read privilege write sql alter user read privilege write
sql show users sql show users
if $data1_read != write then if $data1_read != writable then
return -1 return -1
endi endi

View File

@ -438,22 +438,25 @@ sleep 1000
print ============================== step17 print ============================== step17
print ========= check data print ========= check data
sql reset query cache
sleep 1000
sql use c_b1_d1 sql use c_b1_d1
sql select * from c_b1_t1 sql select * from c_b1_t1
if $rows != 0 then if $rows != 5 then
return -1 return -1
endi endi
sql use c_b1_d2 sql use c_b1_d2
sql select * from c_b1_t2 sql select * from c_b1_t2
if $rows == 0 then if $rows == 6 then
return -1 return -1
endi endi
sql use c_b1_d3 sql use c_b1_d3
sql select * from c_b1_t3 order by t desc sql select * from c_b1_t3 order by t desc
print $data01 $data11 $data21 $data31 $data41 print $data01 $data11 $data21 $data31 $data41
if $rows != 1 then if $rows != 6 then
return -1 return -1
endi endi
if $data01 != 36 then if $data01 != 36 then
@ -540,30 +543,11 @@ if $data41 != 85 then
return -1 return -1
endi endi
sql use c_b1_d9
sql select * from c_b1_t9 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 91 then
return -1
endi
if $data11 != 92 then
return -1
endi
if $data21 != 93 then
return -1
endi
if $data31 != 94 then
return -1
endi
if $data41 != 95 then
return -1
endi
print ============================================ over print ============================================ over
#system sh/exec_up.sh -n dnode2 -s stop -x SIGINT system sh/exec_up.sh -n dnode2 -s stop -x SIGINT
#system sh/exec_up.sh -n dnode3 -s stop -x SIGINT system sh/exec_up.sh -n dnode3 -s stop -x SIGINT
#system sh/exec_up.sh -n dnode4 -s stop -x SIGINT system sh/exec_up.sh -n dnode4 -s stop -x SIGINT
#system sh/exec_up.sh -n dnode5 -s stop -x SIGINT system sh/exec_up.sh -n dnode5 -s stop -x SIGINT

View File

@ -7,23 +7,28 @@ GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m' GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m' NC='\033[0m'
echo "### run TSIM script ###"
cd script cd script
./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | tee out.txt ./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | grep -v 'default' | tee out.txt
totalSuccess=`grep -w 'success' out.txt | wc -l` totalSuccess=`grep 'success' out.txt | wc -l`
totalBasic=`grep success out.txt | grep Suite | wc -l` totalBasic=`grep success out.txt | grep Suite | wc -l`
if [ "$totalSuccess" -gt "0" ]; then if [ "$totalSuccess" -gt "0" ]; then
totalSuccess=`expr $totalSuccess - $totalBasic` totalSuccess=`expr $totalSuccess - $totalBasic`
echo -e "${GREEN} ### Total $totalSuccess TSIM case(s) succeed! ### ${NC}"
fi fi
totalFailed=`grep -w 'failed\|fault' out.txt | wc -l` echo -e "${GREEN} ### Total $totalSuccess TSIM case(s) succeed! ### ${NC}"
totalFailed=`grep 'failed\|fault' out.txt | wc -l`
echo -e "${RED} ### Total $totalFailed TSIM case(s) failed! ### ${NC}"
if [ "$totalFailed" -ne "0" ]; then if [ "$totalFailed" -ne "0" ]; then
echo -e "${RED} ### Total $totalFailed TSIM case(s) failed! ### ${NC}" # echo -e "${RED} ### Total $totalFailed TSIM case(s) failed! ### ${NC}"
exit $totalFailed exit $totalFailed
fi fi
echo "### run Python script ###"
cd ../pytest cd ../pytest
if [ "$1" == "cron" ]; then if [ "$1" == "cron" ]; then

View File

@ -676,6 +676,8 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
while ((row = taos_fetch_row(result))) { while ((row = taos_fetch_row(result))) {
if (numOfRows < MAX_QUERY_ROW_NUM) { if (numOfRows < MAX_QUERY_ROW_NUM) {
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
int* length = taos_fetch_lengths(result);
for (int i = 0; i < num_fields; i++) { for (int i = 0; i < num_fields; i++) {
char *value = NULL; char *value = NULL;
if (i < MAX_QUERY_COL_NUM) { if (i < MAX_QUERY_COL_NUM) {
@ -733,8 +735,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
memcpy(value, row[i], fields[i].bytes); memset(value, 0, MAX_QUERY_VALUE_LEN);
value[fields[i].bytes] = 0; memcpy(value, row[i], length[i]);
value[length[i]] = 0;
// snprintf(value, fields[i].bytes, "%s", (char *)row[i]); // snprintf(value, fields[i].bytes, "%s", (char *)row[i]);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP: