Skip to content

Commit

Permalink
Modify AuditReplay workflow to output count and latency of operations…
Browse files Browse the repository at this point in the history
… (PR #92)
  • Loading branch information
csgregorian authored and xkrogen committed Mar 14, 2019
1 parent 9e4899c commit 66d3e19
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate
* factor of 0.5 would make it occur half as fast.
*/
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text, UserCommandKey, LongWritable> {
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text, UserCommandKey, CountTimeWritable> {

public static final String INPUT_PATH_KEY = "auditreplay.input-path";
public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
Expand Down Expand Up @@ -257,15 +257,16 @@ public void cleanup(Mapper.Context context) throws InterruptedException, IOExcep
@Override
public void configureJob(Job job) {
job.setMapOutputKeyClass(UserCommandKey.class);
job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputValueClass(CountTimeWritable.class);
job.setInputFormatClass(NoSplitTextInputFormat.class);

job.setNumReduceTasks(1);
job.setReducerClass(AuditReplayReducer.class);
job.setOutputKeyClass(UserCommandKey.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputValueClass(CountTimeWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY)));
job.getConfiguration().set(TextOutputFormat.SEPERATOR, ",");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package com.linkedin.dynamometer.workloadgenerator.audit;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
Expand All @@ -16,14 +15,17 @@
* of the command (READ/WRITE).
*/
public class AuditReplayReducer extends
Reducer<UserCommandKey, LongWritable, UserCommandKey, LongWritable> {
Reducer<UserCommandKey, CountTimeWritable, UserCommandKey, CountTimeWritable> {

@Override
protected void reduce(UserCommandKey key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable v : values) {
sum += v.get();
protected void reduce(UserCommandKey key, Iterable<CountTimeWritable> values, Context context)
throws IOException, InterruptedException {
long countSum = 0;
long timeSum = 0;
for (CountTimeWritable v : values) {
countSum += v.getCount();
timeSum += v.getTime();
}
context.write(key, new LongWritable(sum));
context.write(key, new CountTimeWritable(countSum, timeSum));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class AuditReplayThread extends Thread {
// and merge them all together at the end.
private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
private Map<String, Counter> individualCommandsMap = new HashMap<>();
private Map<UserCommandKey, LongWritable> commandLatencyMap = new HashMap<>();
private Map<UserCommandKey, CountTimeWritable> commandLatencyMap = new HashMap<>();

AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue,
ConcurrentMap<String, FileSystem> fsCache) throws IOException {
Expand Down Expand Up @@ -102,7 +101,7 @@ void drainCounters(Mapper.Context context) {
}

void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException {
for (Map.Entry<UserCommandKey, LongWritable> ent : commandLatencyMap.entrySet()) {
for (Map.Entry<UserCommandKey, CountTimeWritable> ent : commandLatencyMap.entrySet()) {
context.write(ent.getKey(), ent.getValue());
}
}
Expand Down Expand Up @@ -265,10 +264,12 @@ public FileSystem run() {

long latency = System.currentTimeMillis() - startTime;

UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.getType().toString());
commandLatencyMap.putIfAbsent(userCommandKey, new LongWritable(0));
LongWritable latencyWritable = commandLatencyMap.get(userCommandKey);
latencyWritable.set(latencyWritable.get() + latency);
UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(),
replayCommand.toString(), replayCommand.getType().toString());
commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable());
CountTimeWritable latencyWritable = commandLatencyMap.get(userCommandKey);
latencyWritable.setCount(latencyWritable.getCount() + 1);
latencyWritable.setTime(latencyWritable.getTime() + latency);

switch (replayCommand.getType()) {
case WRITE:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer.workloadgenerator.audit;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


/**
* UserCommandKey is a {@link Writable} used as a composite value that accumulates the count
* and cumulative latency of replayed commands. It is used as the output value for
* AuditReplayMapper and AuditReplayReducer.
*/
public class CountTimeWritable implements Writable {
private LongWritable count;
private LongWritable time;

public CountTimeWritable() {
count = new LongWritable();
time = new LongWritable();
}

public CountTimeWritable(LongWritable count, LongWritable time) {
this.count = count;
this.time = time;
}

public CountTimeWritable(long count, long time) {
this.count = new LongWritable(count);
this.time = new LongWritable(time);
}

public long getCount() {
return count.get();
}

public long getTime() {
return time.get();
}

public void setCount(long count) {
this.count.set(getCount() + count);
}

public void setTime(long time) {
this.time.set(getTime() + time);
}

@Override
public void write(DataOutput out) throws IOException {
count.write(out);
time.write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
count.readFields(in);
time.readFields(in);
}

@Override
public String toString() {
return getCount() + "," + getTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,31 @@
import org.apache.hadoop.io.WritableComparable;

/**
* UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id and
* type of a replayed command. It is used as the output key for AuditReplayMapper and the
* UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id, name,
* and type of a replayed command. It is used as the output key for AuditReplayMapper and the
* keys for AuditReplayReducer.
*/
public class UserCommandKey implements WritableComparable {
private Text user;
private Text command;
private Text type;

public UserCommandKey() {
user = new Text();
command = new Text();
type = new Text();
}

public UserCommandKey(Text user, Text command) {
public UserCommandKey(Text user, Text command, Text type) {
this.user = user;
this.command = command;
this.type = type;
}

public UserCommandKey(String user, String command) {
public UserCommandKey(String user, String command, String type) {
this.user = new Text(user);
this.command = new Text(command);
this.type = new Text(type);
}

public String getUser() {
Expand All @@ -43,17 +47,23 @@ public String getUser() {
public String getCommand() {
return command.toString();
}

public String getType() {
return type.toString();
}

@Override
public void write(DataOutput out) throws IOException {
user.write(out);
command.write(out);
type.write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
user.readFields(in);
command.readFields(in);
type.readFields(in);
}

@Override
Expand All @@ -63,7 +73,7 @@ public int compareTo(@Nonnull Object o) {

@Override
public String toString() {
return getUser() + "," + getCommand();
return getUser() + "," + getType() + "," + getCommand();
}

@Override
Expand All @@ -75,11 +85,13 @@ public boolean equals(Object o) {
return false;
}
UserCommandKey that = (UserCommandKey) o;
return getUser().equals(that.getUser()) && getCommand().equals(that.getCommand());
return getUser().equals(that.getUser()) &&
getCommand().equals(that.getCommand()) &&
getType().equals(that.getType());
}

@Override
public int hashCode() {
return Objects.hash(getUser(), getCommand());
return Objects.hash(getUser(), getCommand(), getType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ImpersonationProvider;
import org.apache.htrace.commons.logging.Log;
import org.apache.htrace.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -32,7 +34,8 @@


public class TestWorkloadGenerator {

private static final Log LOG = LogFactory.getLog(TestWorkloadGenerator.class);

private Configuration conf;
private MiniDFSCluster miniCluster;
private FileSystem dfs;
Expand Down Expand Up @@ -114,7 +117,11 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio
assertTrue(dfs.exists(new Path(auditOutputPath)));
try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, "part-r-00000"))) {
String auditOutput = IOUtils.toString(auditOutputFile);
assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*"));
LOG.info(auditOutput);
assertTrue(auditOutput.matches(".*(hdfs,WRITE,[A-Z]+,[17]+,[0-9]+\\n){3}.*"));
// Matches three lines of the format "hdfs,WRITE,name,count,time"
// Using [17] for the count group because each operation is run either
// 1 or 7 times but the output order isn't guaranteed
}
}
}

0 comments on commit 66d3e19

Please sign in to comment.