从CSV读取记录的工具类:UserBehaviorCsvFileReader,后面在主程序中会用到java8的Steam API来处理集合,所以UserBehaviorCsvFileReader实现了Supplier接口:
public class UserBehaviorCsvFileReader implements Supplier<UserBehavior> { private final String filePath; private CsvReader csvReader; public UserBehaviorCsvFileReader(String filePath) throws IOException { this.filePath = filePath; try { csvReader = new CsvReader(filePath); csvReader.readHeaders(); } catch (IOException e) { throw new IOException("Error reading TaxiRecords from file: " + filePath, e); } } @Override public UserBehavior get() { UserBehavior userBehavior = null; try{ if(csvReader.readRecord()) { csvReader.getRawRecord(); userBehavior = new UserBehavior( Long.valueOf(csvReader.get(0)), Long.valueOf(csvReader.get(1)), Long.valueOf(csvReader.get(2)), csvReader.get(3), new Date(Long.valueOf(csvReader.get(4))*1000L)); } } catch (IOException e) { throw new NoSuchElementException("IOException from " + filePath); } if (null==userBehavior) { throw new NoSuchElementException("All records read from " + filePath); } return userBehavior; } }每条记录对应的Bean类:UserBehavior,和CSV记录格式保持一致即可,表示时间的ts字段,使用了JsonFormat注解,在序列化的时候以此来控制格式:
public class UserBehavior { @JsonFormat private long user_id; @JsonFormat private long item_id; @JsonFormat private long category_id; @JsonFormat private String behavior; @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'") private Date ts; public UserBehavior() { } public UserBehavior(long user_id, long item_id, long category_id, String behavior, Date ts) { this.user_id = user_id; this.item_id = item_id; this.category_id = category_id; this.behavior = behavior; this.ts = ts; } }Java对象序列化成JSON的序列化类:JsonSerializer
public class JsonSerializer<T> { private final ObjectMapper jsonMapper = new ObjectMapper(); public String toJSONString(T r) { try { return jsonMapper.writeValueAsString(r); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Could not serialize record: " + r, e); } } public byte[] toJSONBytes(T r) { try { return jsonMapper.writeValueAsBytes(r); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Could not serialize record: " + r, e); } } }