diff --git a/pandas/parser.pyx b/pandas/parser.pyx index 185cf1a752803..564e60bc59677 100644 --- a/pandas/parser.pyx +++ b/pandas/parser.pyx @@ -721,6 +721,7 @@ cdef class TextReader: cdef: size_t rows_read = 0 chunks = [] + int status if rows is None: while True: @@ -747,7 +748,9 @@ cdef class TextReader: else: chunks.append(chunk) - parser_trim_buffers(self.parser) + status = parser_trim_buffers(self.parser) + if status < 0: + raise_parser_error('Error trimming data', self.parser) if len(chunks) == 0: raise StopIteration @@ -812,7 +815,10 @@ cdef class TextReader: # trim parser_consume_rows(self.parser, rows_read) if trim: - parser_trim_buffers(self.parser) + status = parser_trim_buffers(self.parser) + + if status < 0: + raise_parser_error('Error trimming data', self.parser) self.parser_start -= rows_read self._end_clock('Parser memory cleanup') diff --git a/pandas/src/parser/tokenizer.c b/pandas/src/parser/tokenizer.c index cad5d98dde53a..79de1beeb1cd2 100644 --- a/pandas/src/parser/tokenizer.c +++ b/pandas/src/parser/tokenizer.c @@ -53,6 +53,7 @@ static void *safe_realloc(void *buffer, size_t size) { // different-realloc-behaviour-in-linux-and-osx result = realloc(buffer, size); + TRACE(("safe_realloc: buffer = %p, size = %zu, result = %p\n", buffer, size, result)) if (result != NULL) { // errno gets set to 12 on my OS Xmachine in some cases even when the @@ -92,6 +93,7 @@ coliter_t *coliter_new(parser_t *self, int i) { static void free_if_not_null(void *ptr) { + TRACE(("free_if_not_null %p\n", ptr)) if (ptr != NULL) free(ptr); } @@ -106,24 +108,28 @@ static void free_if_not_null(void *ptr) { static void *grow_buffer(void *buffer, int length, int *capacity, int space, int elsize, int *error) { - int cap = *capacity; + int cap = *capacity; + void *newbuffer = buffer; - // Can we fit potentially nbytes tokens (+ null terminators) in the stream? - while (length + space > cap) { - cap = cap? cap << 1 : 2; - - buffer = safe_realloc(buffer, elsize * cap); - - if (buffer == NULL) { - // TODO: error codes - *error = -1; - } - } + // Can we fit potentially nbytes tokens (+ null terminators) in the stream? + while ( (length + space >= cap) && (newbuffer != NULL) ){ + cap = cap? cap << 1 : 2; + buffer = newbuffer; + newbuffer = safe_realloc(newbuffer, elsize * cap); + } - // sigh, multiple return values - *capacity = cap; - *error = 0; - return buffer; + if (newbuffer == NULL) { + // realloc failed so don't change *capacity, set *error to errno + // and return the last good realloc'd buffer so it can be freed + *error = errno; + newbuffer = buffer; + } else { + // realloc worked, update *capacity and set *error to 0 + // sigh, multiple return values + *capacity = cap; + *error = 0; + } + return newbuffer; } @@ -280,7 +286,7 @@ void parser_free(parser_t *self) { static int make_stream_space(parser_t *self, size_t nbytes) { int i, status, cap; - void *orig_ptr; + void *orig_ptr, *newptr; // Can we fit potentially nbytes tokens (+ null terminators) in the stream? @@ -291,11 +297,13 @@ static int make_stream_space(parser_t *self, size_t nbytes) { */ orig_ptr = (void *) self->stream; + TRACE(("\n\nmake_stream_space: nbytes = %zu. grow_buffer(self->stream...)\n", nbytes)) self->stream = (char*) grow_buffer((void *) self->stream, self->stream_len, &self->stream_cap, nbytes * 2, sizeof(char), &status); - + TRACE(("make_stream_space: self->stream=%p, self->stream_len = %zu, self->stream_cap=%zu, status=%zu\n", + self->stream, self->stream_len, self->stream_cap, status)) if (status != 0) { return PARSER_OUT_OF_MEMORY; } @@ -323,6 +331,8 @@ static int make_stream_space(parser_t *self, size_t nbytes) { self->words_len, &self->words_cap, nbytes, sizeof(char*), &status); + TRACE(("make_stream_space: grow_buffer(self->self->words, %zu, %zu, %zu, %d)\n", + self->words_len, self->words_cap, nbytes, status)) if (status != 0) { return PARSER_OUT_OF_MEMORY; } @@ -330,10 +340,12 @@ static int make_stream_space(parser_t *self, size_t nbytes) { // realloc took place if (cap != self->words_cap) { - self->word_starts = (int*) safe_realloc((void *) self->word_starts, - sizeof(int) * self->words_cap); - if (self->word_starts == NULL) { + TRACE(("make_stream_space: cap != self->words_cap, nbytes = %d, self->words_cap=%d\n", nbytes, self->words_cap)) + newptr = safe_realloc((void *) self->word_starts, sizeof(int) * self->words_cap); + if (newptr == NULL) { return PARSER_OUT_OF_MEMORY; + } else { + self->word_starts = (int*) newptr; } } @@ -356,17 +368,20 @@ static int make_stream_space(parser_t *self, size_t nbytes) { self->lines + 1, &self->lines_cap, nbytes, sizeof(int), &status); + TRACE(("make_stream_space: grow_buffer(self->line_start, %zu, %zu, %zu, %d)\n", + self->lines + 1, self->lines_cap, nbytes, status)) if (status != 0) { return PARSER_OUT_OF_MEMORY; } // realloc took place if (cap != self->lines_cap) { - self->line_fields = (int*) safe_realloc((void *) self->line_fields, - sizeof(int) * self->lines_cap); - - if (self->line_fields == NULL) { + TRACE(("make_stream_space: cap != self->lines_cap, nbytes = %d\n", nbytes)) + newptr = safe_realloc((void *) self->line_fields, sizeof(int) * self->lines_cap); + if (newptr == NULL) { return PARSER_OUT_OF_MEMORY; + } else { + self->line_fields = (int*) newptr; } } @@ -378,6 +393,14 @@ static int make_stream_space(parser_t *self, size_t nbytes) { static int push_char(parser_t *self, char c) { /* TRACE(("pushing %c \n", c)) */ + TRACE(("push_char: self->stream[%zu] = %x, stream_cap=%zu\n", self->stream_len+1, c, self->stream_cap)) + if (self->stream_len >= self->stream_cap) { + TRACE(("push_char: ERROR!!! self->stream_len(%d) >= self->stream_cap(%d)\n", + self->stream_len, self->stream_cap)) + self->error_msg = (char*) malloc(64); + sprintf(self->error_msg, "Buffer overflow caught - possible malformed input file.\n"); + return PARSER_OUT_OF_MEMORY; + } self->stream[self->stream_len++] = c; return 0; } @@ -386,6 +409,13 @@ static int P_INLINE end_field(parser_t *self) { // XXX cruft self->numeric_field = 0; + if (self->words_len >= self->words_cap) { + TRACE(("end_field: ERROR!!! self->words_len(%zu) >= self->words_cap(%zu)\n", self->words_len, self->words_cap)) + self->error_msg = (char*) malloc(64); + sprintf(self->error_msg, "Buffer overflow caught - possible malformed input file.\n"); + return PARSER_OUT_OF_MEMORY; + } + // null terminate token push_char(self, '\0'); @@ -443,22 +473,17 @@ static int end_line(parser_t *self) { ex_fields = self->line_fields[self->lines - 1]; } } + if (self->state == SKIP_LINE) { + TRACE(("Skipping row %d\n", self->file_lines)); + // increment file line count + self->file_lines++; - if (self->skipset != NULL) { - k = kh_get_int64((kh_int64_t*) self->skipset, self->file_lines); - - if (k != ((kh_int64_t*)self->skipset)->n_buckets) { - TRACE(("Skipping row %d\n", self->file_lines)); - // increment file line count - self->file_lines++; - - // skip the tokens from this bad line - self->line_start[self->lines] += fields; + // skip the tokens from this bad line + self->line_start[self->lines] += fields; - // reset field count - self->line_fields[self->lines] = 0; - return 0; - } + // reset field count + self->line_fields[self->lines] = 0; + return 0; } /* printf("Line: %d, Fields: %d, Ex-fields: %d\n", self->lines, fields, ex_fields); */ @@ -523,6 +548,13 @@ static int end_line(parser_t *self) { /* printf("word at column 5: %s\n", COLITER_NEXT(it)); */ // good line, set new start point + // good line, set new start point + if (self->lines >= self->lines_cap) { + TRACE(("end_line: ERROR!!! self->lines(%zu) >= self->lines_cap(%zu)\n", self->lines, self->lines_cap)) \ + self->error_msg = (char*) malloc(100); \ + sprintf(self->error_msg, "Buffer overflow caught - possible malformed input file.\n"); \ + return PARSER_OUT_OF_MEMORY; \ + } self->line_start[self->lines] = (self->line_start[self->lines - 1] + fields); @@ -564,6 +596,8 @@ static int parser_buffer_bytes(parser_t *self, size_t nbytes) { status = 0; self->datapos = 0; self->data = self->cb_io(self->source, nbytes, &bytes_read, &status); + TRACE(("parser_buffer_bytes self->cb_io: nbytes=%zu, datalen: %d, status=%d\n", + nbytes, bytes_read, status)); self->datalen = bytes_read; if (status != REACHED_EOF && self->data == NULL) { @@ -592,17 +626,16 @@ static int parser_buffer_bytes(parser_t *self, size_t nbytes) { // printf("pushing %c\n", c); -#if defined(VERBOSE) #define PUSH_CHAR(c) \ - printf("Pushing %c, slen now: %d\n", c, slen); \ + TRACE(("PUSH_CHAR: Pushing %c, slen= %d, stream_cap=%zu, stream_len=%zu\n", c, slen, self->stream_cap, self->stream_len)) \ + if (slen >= maxstreamsize) { \ + TRACE(("PUSH_CHAR: ERROR!!! slen(%d) >= maxstreamsize(%d)\n", slen, maxstreamsize)) \ + self->error_msg = (char*) malloc(100); \ + sprintf(self->error_msg, "Buffer overflow caught - possible malformed input file.\n"); \ + return PARSER_OUT_OF_MEMORY; \ + } \ *stream++ = c; \ slen++; -#else -#define PUSH_CHAR(c) \ - *stream++ = c; \ - slen++; -#endif - // This is a little bit of a hack but works for now @@ -656,10 +689,19 @@ typedef int (*parser_op)(parser_t *self, size_t line_limit); TRACE(("datapos: %d, datalen: %d\n", self->datapos, self->datalen)); +int skip_this_line(parser_t *self, int64_t rownum) { + if (self->skipset != NULL) { + return ( kh_get_int64((kh_int64_t*) self->skipset, self->file_lines) != + ((kh_int64_t*)self->skipset)->n_buckets ); + } + return 0; +} + int tokenize_delimited(parser_t *self, size_t line_limit) { int i, slen, start_lines; + long maxstreamsize; char c; char *stream; char *buf = self->data + self->datapos; @@ -674,6 +716,7 @@ int tokenize_delimited(parser_t *self, size_t line_limit) stream = self->stream + self->stream_len; slen = self->stream_len; + maxstreamsize = self->stream_cap; TRACE(("%s\n", buf)); @@ -688,10 +731,25 @@ int tokenize_delimited(parser_t *self, size_t line_limit) switch(self->state) { + case SKIP_LINE: + TRACE(("tokenize_delimited SKIP_LINE %c, state %d\n", c, self->state)); + if (c == '\n') { + END_LINE(); + } + break; + case START_RECORD: // start of record - - if (c == '\n') { + if (skip_this_line(self, self->file_lines)) { + if (c == '\n') { + END_LINE() + } + else { + self->state = SKIP_LINE; + } + break; + } + else if (c == '\n') { // \n\r possible? END_LINE(); break; @@ -914,6 +972,7 @@ int tokenize_delim_customterm(parser_t *self, size_t line_limit) { int i, slen, start_lines; + long maxstreamsize; char c; char *stream; char *buf = self->data + self->datapos; @@ -928,6 +987,7 @@ int tokenize_delim_customterm(parser_t *self, size_t line_limit) stream = self->stream + self->stream_len; slen = self->stream_len; + maxstreamsize = self->stream_cap; TRACE(("%s\n", buf)); @@ -941,9 +1001,26 @@ int tokenize_delim_customterm(parser_t *self, size_t line_limit) self->state)); switch(self->state) { + + case SKIP_LINE: + TRACE(("tokenize_delim_customterm SKIP_LINE %c, state %d\n", c, self->state)); + if (c == self->lineterminator) { + END_LINE(); + } + break; + case START_RECORD: // start of record - if (c == self->lineterminator) { + if (skip_this_line(self, self->file_lines)) { + if (c == self->lineterminator) { + END_LINE() + } + else { + self->state = SKIP_LINE; + } + break; + } + else if (c == self->lineterminator) { // \n\r possible? END_LINE(); break; @@ -1116,6 +1193,7 @@ int tokenize_delim_customterm(parser_t *self, size_t line_limit) int tokenize_whitespace(parser_t *self, size_t line_limit) { int i, slen, start_lines; + long maxstreamsize; char c; char *stream; char *buf = self->data + self->datapos; @@ -1129,6 +1207,7 @@ int tokenize_whitespace(parser_t *self, size_t line_limit) stream = self->stream + self->stream_len; slen = self->stream_len; + maxstreamsize = self->stream_cap; TRACE(("%s\n", buf)); @@ -1142,6 +1221,12 @@ int tokenize_whitespace(parser_t *self, size_t line_limit) self->state)); switch(self->state) { + case SKIP_LINE: + TRACE(("tokenize_whitespace SKIP_LINE %c, state %d\n", c, self->state)); + if (c == '\n') { + END_LINE(); + } + break; case EAT_WHITESPACE: if (!IS_WHITESPACE(c)) { @@ -1155,7 +1240,16 @@ int tokenize_whitespace(parser_t *self, size_t line_limit) case START_RECORD: // start of record - if (c == '\n') { + if (skip_this_line(self, self->file_lines)) { + if (c == '\n') { + END_LINE() + } + else { + self->state = SKIP_LINE; + } + break; + } + else if (c == '\n') { // \n\r possible? END_LINE(); break; @@ -1468,32 +1562,73 @@ int parser_trim_buffers(parser_t *self) { Free memory */ size_t new_cap; - - /* trim stream */ - new_cap = _next_pow2(self->stream_len) + 1; - if (new_cap < self->stream_cap) { - self->stream = safe_realloc((void*) self->stream, new_cap); - self->stream_cap = new_cap; - } + void *newptr; + int i; /* trim words, word_starts */ new_cap = _next_pow2(self->words_len) + 1; if (new_cap < self->words_cap) { - self->words = (char**) safe_realloc((void*) self->words, - new_cap * sizeof(char*)); - self->word_starts = (int*) safe_realloc((void*) self->word_starts, - new_cap * sizeof(int)); - self->words_cap = new_cap; + TRACE(("parser_trim_buffers: new_cap < self->words_cap\n")); + newptr = safe_realloc((void*) self->words, new_cap * sizeof(char*)); + if (newptr == NULL) { + return PARSER_OUT_OF_MEMORY; + } else { + self->words = (char**) newptr; + } + newptr = safe_realloc((void*) self->word_starts, new_cap * sizeof(int)); + if (newptr == NULL) { + return PARSER_OUT_OF_MEMORY; + } else { + self->word_starts = (int*) newptr; + self->words_cap = new_cap; + } + } + + /* trim stream */ + new_cap = _next_pow2(self->stream_len) + 1; + TRACE(("parser_trim_buffers: new_cap = %zu, stream_cap = %zu, lines_cap = %zu\n", + new_cap, self->stream_cap, self->lines_cap)); + if (new_cap < self->stream_cap) { + TRACE(("parser_trim_buffers: new_cap < self->stream_cap, calling safe_realloc\n")); + newptr = safe_realloc((void*) self->stream, new_cap); + if (newptr == NULL) { + return PARSER_OUT_OF_MEMORY; + } else { + // Update the pointers in the self->words array (char **) if `safe_realloc` + // moved the `self->stream` buffer. This block mirrors a similar block in + // `make_stream_space`. + if (self->stream != newptr) { + self->pword_start = newptr + self->word_start; + + for (i = 0; i < self->words_len; ++i) + { + self->words[i] = newptr + self->word_starts[i]; + } + } + + self->stream = newptr; + self->stream_cap = new_cap; + } + } /* trim line_start, line_fields */ new_cap = _next_pow2(self->lines) + 1; if (new_cap < self->lines_cap) { - self->line_start = (int*) safe_realloc((void*) self->line_start, - new_cap * sizeof(int)); - self->line_fields = (int*) safe_realloc((void*) self->line_fields, - new_cap * sizeof(int)); - self->lines_cap = new_cap; + TRACE(("parser_trim_buffers: new_cap < self->lines_cap\n")); + newptr = safe_realloc((void*) self->line_start, new_cap * sizeof(int)); + if (newptr == NULL) { + return PARSER_OUT_OF_MEMORY; + } else { + self->line_start = (int*) newptr; + } + newptr = safe_realloc((void*) self->line_fields, new_cap * sizeof(int)); + if (newptr == NULL) { + return PARSER_OUT_OF_MEMORY; + } else { + self->line_fields = (int*) newptr; + self->lines_cap = new_cap; + } } return 0; @@ -1546,7 +1681,8 @@ int _tokenize_helper(parser_t *self, size_t nrows, int all) { return 0; } - TRACE(("Asked to tokenize %d rows\n", (int) nrows)); + TRACE(("Asked to tokenize %d rows, datapos=%d, datalen=%d\n", \ + (int) nrows, self->datapos, self->datalen)); while (1) { if (!all && self->lines - start_lines >= nrows) @@ -1565,7 +1701,8 @@ int _tokenize_helper(parser_t *self, size_t nrows, int all) { } } - TRACE(("Trying to process %d bytes\n", self->datalen - self->datapos)); + TRACE(("_tokenize_helper: Trying to process %d bytes, datalen=%d, datapos= %d\n", + self->datalen - self->datapos, self->datalen, self->datapos)); /* TRACE(("sourcetype: %c, status: %d\n", self->sourcetype, status)); */ status = tokenize_bytes(self, nrows); diff --git a/pandas/src/parser/tokenizer.h b/pandas/src/parser/tokenizer.h index 01f9397685da6..82d5d657e3478 100644 --- a/pandas/src/parser/tokenizer.h +++ b/pandas/src/parser/tokenizer.h @@ -123,6 +123,7 @@ typedef enum { EAT_CRNL, EAT_WHITESPACE, EAT_COMMENT, + SKIP_LINE, FINISHED } ParserState;