From 84256474eddba93bece14aea6d738432730b0c0e Mon Sep 17 00:00:00 2001 From: jqdelove Date: Tue, 23 Jul 2024 16:46:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E4=B8=AA=E5=AD=97?= =?UTF-8?q?=E7=AC=A6=E4=BD=9C=E4=B8=BA=E5=88=97=E7=9A=84=E5=88=86=E9=9A=94?= =?UTF-8?q?=E7=AC=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../odps/udf/example/text/TextExtractor.java | 50 ++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/text/TextExtractor.java b/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/text/TextExtractor.java index 6a1a14ac..1cf07f69 100644 --- a/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/text/TextExtractor.java +++ b/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/text/TextExtractor.java @@ -28,7 +28,7 @@ public class TextExtractor extends Extractor { private InputStreamSet inputs; - private char delimiterChar; + private String delimiter; private char linebreakChar; private DataAttributes attributes; private Reader currentReader; @@ -64,14 +64,10 @@ public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes at this.ctx = ctx; // check if "delimiter" attribute is supplied via SQL query String columnDelimiter = this.attributes.getValueByKey("delimiter"); - if ( columnDelimiter != null) { - if (columnDelimiter.length() == 1){ - this.delimiterChar = columnDelimiter.charAt(0); - } else{ - throw new IllegalArgumentException("column delimiter cannot be more than one character, sees: " + columnDelimiter); - } + if (columnDelimiter != null) { + this.delimiter = columnDelimiter; } else { - this.delimiterChar = ','; + this.delimiter = ","; } String lineTerminator = attributes.getValueByKey("line.terminator"); if (lineTerminator != null && !lineTerminator.isEmpty()) { @@ -106,9 +102,9 @@ public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes at } System.out.println( - org.apache.commons.lang.StringEscapeUtils.escapeJava(("TextExtractor set up with delimiter [" + this.delimiterChar + "], " + - " line terminator [" + linebreakChar + "], with complex text flag set to " - + this.complexText + " and reading gzip file set to " + this.isGzip))); + org.apache.commons.lang.StringEscapeUtils.escapeJava(("TextExtractor set up with delimiter [" + this.delimiter + "], " + + " line terminator [" + linebreakChar + "], with complex text flag set to " + + this.complexText + " and reading gzip file set to " + this.isGzip))); // note: more properties can be inited from attributes if needed this.outputColumns = this.attributes.getRecordColumns(); this.outputTypes = new OdpsType[this.outputColumns.length]; @@ -248,7 +244,10 @@ public String[] parseLine(Reader r) throws IOException{ StringBuffer curPart = new StringBuffer(); boolean hasQuotes = false; boolean quoteStarted = false; + StringBuilder readBuffer = new StringBuilder(); + int delimiterIndex = 0; while (ch >= 0) { + readBuffer.append((char) ch); if (hasQuotes) { quoteStarted = true; if (ch == '\"') { @@ -266,10 +265,12 @@ public String[] parseLine(Reader r) throws IOException{ curPart.append('\"'); } } - else if (ch == this.delimiterChar && !quoteStarted) { + else if (readBuffer.toString().endsWith(this.delimiter) && !quoteStarted) { + readBuffer.setLength(readBuffer.length() - this.delimiter.length()); setLinePart(colIndx++, curPart.toString()); curPart = new StringBuffer(); quoteStarted = false; + readBuffer.setLength(0); } else if (ch == '\r' && ignoreLineFeed) { //ignore LF characters @@ -315,9 +316,9 @@ private void handleMismatchLine(int colIndx) { ctx.getCounter("text.parse", "schema.oversize").increment(1); } String errorMsg = "SCHEMA MISMATCH: External Table schema specified a total of [" + - this.fullSchemaColumns.length + "] columns, but current text line parsed into [" - + colIndx + "] columns delimited by [" + this.delimiterChar + "]. Current line is read as: " - + StringUtils.join(this.lineParts, this.delimiterChar); + this.fullSchemaColumns.length + "] columns, but current text line parsed into [" + + colIndx + "] columns delimited by [" + this.delimiter + "]. Current line is read as: " + + StringUtils.join(this.lineParts, this.delimiter); errorMsg = StringEscapeUtils.escapeJava(errorMsg); if (strict) { throw new RuntimeException(errorMsg); @@ -343,14 +344,14 @@ private String[] readNextLine() throws IOException { } while (currentReader != null) { if (this.complexText){ - String[] parts = parseLine(currentReader); - if (parts != null) { + String[] parts = parseLine(currentReader); + if (parts != null) { return parts; } } else { String line = ((BufferedReader)currentReader).readLine(); if (line != null) { - return StringUtils.splitPreserveAllTokens(line, this.delimiterChar); + return splitPreserveAllTokens(line, this.delimiter); } } currentReader = moveToNextStream(); @@ -388,4 +389,17 @@ private Reader moveToNextStream() throws IOException { } } + private String[] splitPreserveAllTokens(String str, String delimiter) { + ArrayList parts = new ArrayList<>(); + int pos = 0; + int end = str.indexOf(delimiter); + while (end != -1) { + parts.add(str.substring(pos, end)); + pos = end + delimiter.length(); + end = str.indexOf(delimiter, pos); + } + parts.add(str.substring(pos)); + return parts.toArray(new String[0]); + } + }