Skip to content

Commit c033993

Browse files
committed
feat: parser support input stream reader
Signed-off-by: Deng An <[email protected]>
1 parent 2e55cc9 commit c033993

File tree

9 files changed

+134
-15
lines changed

9 files changed

+134
-15
lines changed

task-common/src/main/java/com/oppo/cloud/common/domain/job/LogPath.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.oppo.cloud.common.constant.LogPathType;
2020
import lombok.Data;
2121

22+
import java.io.InputStream;
23+
2224
/**
2325
* LogPath Information
2426
*/
@@ -45,6 +47,11 @@ public class LogPath {
4547
*/
4648
private String logPath;
4749

50+
/*
51+
* InputStream fot this log
52+
*/
53+
private InputStream inputStream;
54+
4855
public LogPath() {
4956

5057
}
@@ -56,4 +63,10 @@ public LogPath(String protocol, String logType, LogPathType logPathType, String
5663
this.logPath = logPath;
5764
}
5865

66+
public LogPath(String protocol, String logType, InputStream inputStream) {
67+
this.protocol = protocol;
68+
this.logType = logType;
69+
this.inputStream = inputStream;
70+
}
71+
5972
}

task-parser/src/main/java/com/oppo/cloud/parser/domain/reader/ReaderObject.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class ReaderObject {
3737
private BufferedReader bufferedReader;
3838
@Getter
3939
@Setter
40-
private FSDataInputStream fsDataInputStream;
40+
private InputStream inputStream;
4141
@Getter
4242
@Setter
4343
private FileSystem fs;
@@ -46,22 +46,22 @@ public BufferedReader getBufferedReader(String compressCodec) throws IOException
4646
if (bufferedReader != null) {
4747
return bufferedReader;
4848
}
49-
InputStream inputStream;
49+
InputStream decompressedInputStream;
5050
switch (compressCodec.toLowerCase(Locale.ROOT)) {
5151
case "lz4":
52-
inputStream = new LZ4BlockInputStream(fsDataInputStream, false);
52+
decompressedInputStream = new LZ4BlockInputStream(inputStream, false);
5353
break;
5454
case "snappy":
55-
inputStream = new SnappyInputStream(fsDataInputStream);
55+
decompressedInputStream = new SnappyInputStream(inputStream);
5656
break;
5757
case "zstd":
58-
inputStream = new BufferedInputStream(new ZstdInputStream(fsDataInputStream), 32 * 1023);
58+
decompressedInputStream = new BufferedInputStream(new ZstdInputStream(inputStream), 32 * 1023);
5959
break;
6060
default:
61-
inputStream = fsDataInputStream;
61+
decompressedInputStream = inputStream;
6262
break;
6363
}
64-
bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
64+
bufferedReader = new BufferedReader(new InputStreamReader(decompressedInputStream));
6565
return bufferedReader;
6666
}
6767

@@ -74,8 +74,8 @@ public BufferedReader getBufferedReader() throws IOException {
7474

7575
public void close() {
7676
try {
77-
if (fsDataInputStream != null) {
78-
fsDataInputStream.close();
77+
if (inputStream != null) {
78+
inputStream.close();
7979
}
8080
if (bufferedReader != null) {
8181
bufferedReader.close();

task-parser/src/main/java/com/oppo/cloud/parser/service/reader/ILogReaderFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
public interface ILogReaderFactory {
2828

2929
String HDFS = "hdfs";
30+
String STREAM = "stream";
3031
String S3 = "s3";
3132

3233
/**
@@ -36,6 +37,8 @@ default IReader create(LogPath logPath) throws Exception {
3637
switch (logPath.getProtocol()) {
3738
case HDFS:
3839
return new HDFSReader(logPath, getNameNodeConf());
40+
case STREAM:
41+
return new InputStreamReader(logPath);
3942
default:
4043
break;
4144
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2023 OPPO.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.oppo.cloud.parser.service.reader;
17+
18+
import com.oppo.cloud.common.domain.job.LogPath;
19+
import com.oppo.cloud.parser.domain.reader.ReaderObject;
20+
21+
import java.util.List;
22+
23+
public class InputStreamReader implements IReader {
24+
private final LogPath logPath;
25+
26+
public InputStreamReader(LogPath logPath) {
27+
this.logPath = logPath;
28+
}
29+
30+
@Override
31+
public List<String> listFiles() throws Exception {
32+
return null;
33+
}
34+
35+
@Override
36+
public List<String> filesPattern() throws Exception {
37+
return null;
38+
}
39+
40+
@Override
41+
public ReaderObject getReaderObject() throws Exception {
42+
ReaderObject readerObject = new ReaderObject();
43+
readerObject.setLogPath(logPath.getLogPath());
44+
readerObject.setFs(null);
45+
readerObject.setInputStream(logPath.getInputStream());
46+
return readerObject;
47+
}
48+
49+
@Override
50+
public List<ReaderObject> getReaderObjects() throws Exception {
51+
return null;
52+
}
53+
}

task-parser/src/main/java/com/oppo/cloud/parser/utils/HDFSUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public static ReaderObject getReaderObject(NameNodeConf nameNode, String path) t
137137
FSDataInputStream fsDataInputStream = fs.open(new Path(path));
138138
ReaderObject readerObject = new ReaderObject();
139139
readerObject.setLogPath(path);
140-
readerObject.setFsDataInputStream(fsDataInputStream);
140+
readerObject.setInputStream(fsDataInputStream);
141141
readerObject.setFs(fs);
142142
return readerObject;
143143
}

task-parser/src/main/java/com/oppo/cloud/parser/utils/JobHistoryUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
import com.oppo.cloud.parser.domain.mr.MRAppInfo;
2121
import com.oppo.cloud.parser.domain.reader.ReaderObject;
2222
import lombok.extern.slf4j.Slf4j;
23-
import org.apache.hadoop.fs.FSDataInputStream;
2423
import org.w3c.dom.Document;
2524
import org.w3c.dom.Element;
2625
import org.w3c.dom.NodeList;
2726

2827
import javax.xml.parsers.DocumentBuilder;
2928
import javax.xml.parsers.DocumentBuilderFactory;
3029
import java.io.ByteArrayInputStream;
30+
import java.io.InputStream;
3131
import java.io.IOException;
3232
import java.util.HashMap;
3333
import java.util.List;
@@ -105,8 +105,8 @@ public static Map<String, String> parseJobConf(ReaderObject confReader) throws I
105105
public static MRAppInfo parseJobHistory(List<ReaderObject> readerObjects) throws Exception {
106106
JobHistoryFileInfo jobHistoryFileInfo = getJobHistoryFileInfo(readerObjects);
107107
Map<String, String> conMap = parseJobConf(jobHistoryFileInfo.getConfReader());
108-
FSDataInputStream fsDataInputStream = jobHistoryFileInfo.getJobHistoryReader().getFsDataInputStream();
109-
ReplayMREventLogs replayMREventLogs = new ReplayMREventLogs(fsDataInputStream);
108+
InputStream inputStream = jobHistoryFileInfo.getJobHistoryReader().getInputStream();
109+
ReplayMREventLogs replayMREventLogs = new ReplayMREventLogs(inputStream);
110110
replayMREventLogs.parse();
111111
MRAppInfo appData = replayMREventLogs.getMRAppInfo();
112112
appData.setConfMap(conMap);

task-parser/src/main/java/com/oppo/cloud/parser/utils/ReplayMREventLogs.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.avro.specific.SpecificDatumReader;
3030
import org.apache.hadoop.fs.FSDataInputStream;
3131

32+
import java.io.InputStream;
3233
import java.util.ArrayList;
3334
import java.util.HashMap;
3435
import java.util.List;
@@ -45,13 +46,14 @@ public class ReplayMREventLogs {
4546

4647
private static final String SPECULATION = "Speculation";
4748

48-
public ReplayMREventLogs(FSDataInputStream in) {
49-
this.in = in;
49+
public ReplayMREventLogs(InputStream in) {
50+
this.in = new FSDataInputStream(in);
5051
this.mrAppInfo = new MRAppInfo();
5152
}
5253

5354

5455
public void parse() throws Exception {
56+
5557
String version = this.in.readLine();
5658
String eventSchema = this.in.readLine();
5759
Schema schema = new Schema.Parser().parse(eventSchema);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.oppo.cloud.parser.service.job.reader;
2+
3+
import com.oppo.cloud.common.constant.LogType;
4+
import com.oppo.cloud.common.domain.job.LogPath;
5+
import com.oppo.cloud.common.domain.job.LogRecord;
6+
import com.oppo.cloud.parser.domain.job.CommonResult;
7+
import com.oppo.cloud.parser.domain.job.ParserParam;
8+
import com.oppo.cloud.parser.domain.job.SparkEventLogParserResult;
9+
import com.oppo.cloud.parser.service.ParamUtil;
10+
import com.oppo.cloud.parser.service.job.parser.IParser;
11+
import com.oppo.cloud.parser.service.job.parser.SimpleParserFactory;
12+
import com.oppo.cloud.parser.service.reader.LogReaderFactory;
13+
import com.oppo.cloud.parser.utils.ResourcePreparer;
14+
import org.junit.Assert;
15+
import org.junit.Test;
16+
17+
import java.io.InputStream;
18+
import java.util.Collections;
19+
20+
public class InputStreamReaderTest extends ResourcePreparer {
21+
@Test
22+
public void inputStreamReaderTest() {
23+
String eventLogPath = "log/event/eventlog";
24+
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(eventLogPath);
25+
SimpleParserFactory simpleParserFactory = new SimpleParserFactory();
26+
LogRecord logRecord = ParamUtil.getLogRecord();
27+
String logType = LogType.SPARK_EVENT.getName();
28+
LogPath logPath = new LogPath();
29+
logPath.setInputStream(inputStream);
30+
logPath.setProtocol(LogReaderFactory.STREAM);
31+
logPath.setLogType(logType);
32+
logPath.setLogPath(eventLogPath);
33+
ParserParam parserParam = new ParserParam(logType,
34+
logRecord,
35+
logRecord.getApps().get(0),
36+
Collections.singletonList(logPath));
37+
IParser parser = simpleParserFactory.createParserInternal(parserParam);
38+
CommonResult commonResult = parser.run();
39+
SparkEventLogParserResult result = (SparkEventLogParserResult) commonResult.getResult();
40+
Assert.assertTrue(result.getMemoryCalculateParam().getDriverMemory() == 536870912L);
41+
Assert.assertTrue(result.getMemoryCalculateParam().getAppTotalTime() == 588898L);
42+
43+
}
44+
}

task-parser/src/test/java/com/oppo/cloud/parser/utils/ResourcePreparer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public class ResourcePreparer extends MiniHdfsCluster {
3030
private static String LOCAL_TEXT_LOG_DIR = "/log/";
3131
private static String HDFS_TEXT_LOG_DIR = "/log";
3232

33+
static {
34+
ParserConfigLoader.init();
35+
}
36+
3337
@BeforeAll
3438
public static void prepareResources() throws IOException {
3539
final URL resourcesDir = ResourcePreparer.class.getResource(LOCAL_TEXT_LOG_DIR);

0 commit comments

Comments
 (0)