fix the issue #433
This commit is contained in:
parent
1977ea4583
commit
148b783534
|
@ -171,6 +171,8 @@ typedef struct STagCond {
|
||||||
|
|
||||||
typedef struct STableDataBlocks {
|
typedef struct STableDataBlocks {
|
||||||
char meterId[TSDB_METER_ID_LEN];
|
char meterId[TSDB_METER_ID_LEN];
|
||||||
|
int8_t tsSource;
|
||||||
|
|
||||||
int64_t vgid;
|
int64_t vgid;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,11 @@
|
||||||
return TSDB_CODE_INVALID_SQL; \
|
return TSDB_CODE_INVALID_SQL; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
static enum {
|
||||||
|
TSDB_USE_SERVER_TS = 0,
|
||||||
|
TSDB_USE_CLI_TS = 1,
|
||||||
|
};
|
||||||
|
|
||||||
static void setErrMsg(char *msg, char *sql);
|
static void setErrMsg(char *msg, char *sql);
|
||||||
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize);
|
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize);
|
||||||
|
|
||||||
|
@ -166,6 +171,40 @@ int tsParseTime(char *value, int32_t valuelen, int64_t *time, char **next, char
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The server time/client time should not be mixed up in one sql string
|
||||||
|
* Do not employ sort operation is not involved if server time is used.
|
||||||
|
*/
|
||||||
|
static int32_t tsCheckTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
|
||||||
|
// once the data block is disordered, we do NOT keep previous timestamp any more
|
||||||
|
if (!pDataBlocks->ordered) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY k = *(TSKEY *) start;
|
||||||
|
|
||||||
|
if (k == 0) {
|
||||||
|
if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
|
||||||
|
return -1;
|
||||||
|
} else if (pDataBlocks->tsSource == -1) {
|
||||||
|
pDataBlocks->tsSource = TSDB_USE_SERVER_TS ;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
|
||||||
|
return -1;
|
||||||
|
} else if (pDataBlocks->tsSource == -1) {
|
||||||
|
pDataBlocks->tsSource = TSDB_USE_CLI_TS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
|
||||||
|
pDataBlocks->ordered = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDataBlocks->prevTS = k;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsParseOneColumnData(SSchema *pSchema, char *value, int valuelen, char *payload, char *msg, char **str,
|
int32_t tsParseOneColumnData(SSchema *pSchema, char *value, int valuelen, char *payload, char *msg, char **str,
|
||||||
bool primaryKey, int16_t timePrec) {
|
bool primaryKey, int16_t timePrec) {
|
||||||
int64_t temp;
|
int64_t temp;
|
||||||
|
@ -391,20 +430,15 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
|
||||||
valuelen++;
|
valuelen++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
|
||||||
int32_t ret = tsParseOneColumnData(&schema[colIndex], value, valuelen, start, error, str,
|
int32_t ret = tsParseOneColumnData(&schema[colIndex], value, valuelen, start, error, str,
|
||||||
colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX, timePrec);
|
colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX, timePrec);
|
||||||
if (ret != 0) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
return -1; // NOTE: here 0 mean error!
|
return -1; // NOTE: here 0 mean error!
|
||||||
}
|
}
|
||||||
|
|
||||||
// once the data block is disordered, we do NOT keep previous timestamp any more
|
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
|
||||||
if (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pDataBlocks->ordered) {
|
return -1;
|
||||||
TSKEY k = *(TSKEY *)start;
|
|
||||||
if (k <= pDataBlocks->prevTS) {
|
|
||||||
pDataBlocks->ordered = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDataBlocks->prevTS = k;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,6 +585,11 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
|
||||||
// size is less than the total size, since duplicated rows may be removed yet.
|
// size is less than the total size, since duplicated rows may be removed yet.
|
||||||
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);
|
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);
|
||||||
|
|
||||||
|
// if use server time, this block must be ordered
|
||||||
|
if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
|
||||||
|
assert(dataBuf->ordered);
|
||||||
|
}
|
||||||
|
|
||||||
if (!dataBuf->ordered) {
|
if (!dataBuf->ordered) {
|
||||||
char *pBlockData = pBlocks->payLoad;
|
char *pBlockData = pBlocks->payLoad;
|
||||||
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
|
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
|
||||||
|
|
|
@ -338,7 +338,8 @@ STableDataBlocks* tscCreateDataBlock(int32_t size) {
|
||||||
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
|
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
|
||||||
dataBuf->nAllocSize = (uint32_t)size;
|
dataBuf->nAllocSize = (uint32_t)size;
|
||||||
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
|
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
|
||||||
dataBuf->ordered = true;
|
|
||||||
|
dataBuf->tsSource = -1;
|
||||||
dataBuf->prevTS = INT64_MIN;
|
dataBuf->prevTS = INT64_MIN;
|
||||||
return dataBuf;
|
return dataBuf;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue