Skip to content

Commit ac80267

Browse files
authored
Add utility to create an empty wal file. (apache#4116)
Updated the CreateEmpty utility with an option to create empty wal file(s).
1 parent 1ae2b17 commit ac80267

File tree

5 files changed

+464
-115
lines changed

5 files changed

+464
-115
lines changed

core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java

-113
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.tserver.util;
20+
21+
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import static org.apache.accumulo.tserver.log.DfsLogger.LOG_FILE_HEADER_V4;
23+
import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
24+
25+
import java.io.DataOutputStream;
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
30+
import org.apache.accumulo.core.cli.ConfigOpts;
31+
import org.apache.accumulo.core.conf.DefaultConfiguration;
32+
import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
33+
import org.apache.accumulo.core.crypto.CryptoUtils;
34+
import org.apache.accumulo.core.file.FileSKVWriter;
35+
import org.apache.accumulo.core.file.rfile.RFileOperations;
36+
import org.apache.accumulo.core.file.rfile.bcfile.Compression;
37+
import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
38+
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
39+
import org.apache.accumulo.core.spi.crypto.CryptoService;
40+
import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
41+
import org.apache.accumulo.server.ServerContext;
42+
import org.apache.accumulo.server.fs.VolumeManager;
43+
import org.apache.accumulo.start.spi.KeywordExecutable;
44+
import org.apache.accumulo.tserver.logger.LogFileKey;
45+
import org.apache.accumulo.tserver.logger.LogFileValue;
46+
import org.apache.hadoop.fs.Path;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
49+
50+
import com.beust.jcommander.IParameterValidator;
51+
import com.beust.jcommander.Parameter;
52+
import com.beust.jcommander.ParameterException;
53+
import com.google.auto.service.AutoService;
54+
55+
/**
56+
* Create an empty RFile for use in recovering from data loss where Accumulo still refers internally
57+
* to a path.
58+
*/
59+
@AutoService(KeywordExecutable.class)
60+
public class CreateEmpty implements KeywordExecutable {
61+
private static final Logger LOG = LoggerFactory.getLogger(CreateEmpty.class);
62+
public static final String RF_EXTENSION = ".rf";
63+
public static final String WAL_EXTENSION = ".wal";
64+
65+
public static class MatchesValidFileExtension implements IParameterValidator {
66+
@Override
67+
public void validate(String name, String value) throws ParameterException {
68+
if (value.endsWith(RF_EXTENSION) || value.endsWith(WAL_EXTENSION)) {
69+
return;
70+
}
71+
throw new ParameterException("File must end with either " + RF_EXTENSION + " or "
72+
+ WAL_EXTENSION + " and '" + value + "' does not.");
73+
}
74+
}
75+
76+
public static class IsSupportedCompressionAlgorithm implements IParameterValidator {
77+
@Override
78+
public void validate(String name, String value) throws ParameterException {
79+
List<String> algorithms = Compression.getSupportedAlgorithms();
80+
if (!algorithms.contains(value)) {
81+
throw new ParameterException("Compression codec must be one of " + algorithms);
82+
}
83+
}
84+
}
85+
86+
static class Opts extends ConfigOpts {
87+
@Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.",
88+
validateWith = IsSupportedCompressionAlgorithm.class)
89+
String codec = new NoCompression().getName();
90+
@Parameter(
91+
description = " <path> { <path> ... } Each path given is a URL."
92+
+ " Relative paths are resolved according to the default filesystem defined in"
93+
+ " your Hadoop configuration, which is usually an HDFS instance.",
94+
required = true, validateWith = MatchesValidFileExtension.class)
95+
List<String> files = new ArrayList<>();
96+
97+
public enum OutFileType {
98+
RF, WAL
99+
}
100+
101+
// rfile as default keeps previous behaviour
102+
@Parameter(names = "--type")
103+
public OutFileType fileType = OutFileType.RF;
104+
105+
}
106+
107+
public static void main(String[] args) throws Exception {
108+
new CreateEmpty().execute(args);
109+
}
110+
111+
@Override
112+
public String keyword() {
113+
return "create-empty";
114+
}
115+
116+
@Override
117+
public String description() {
118+
return "Creates empty RFiles (RF) or empty write-ahead log (WAL) files for emergency recovery";
119+
}
120+
121+
@Override
122+
public void execute(String[] args) throws Exception {
123+
124+
Opts opts = new Opts();
125+
opts.parseArgs("accumulo create-empty", args);
126+
127+
var siteConfig = opts.getSiteConfiguration();
128+
try (ServerContext context = new ServerContext(siteConfig)) {
129+
switch (opts.fileType) {
130+
case RF:
131+
createEmptyRFile(opts, context);
132+
break;
133+
case WAL:
134+
createEmptyWal(opts, context);
135+
break;
136+
default:
137+
throw new ParameterException("file type must be RF or WAL, received: " + opts.fileType);
138+
}
139+
}
140+
}
141+
142+
void createEmptyRFile(final Opts opts, final ServerContext context) throws IOException {
143+
var vm = context.getVolumeManager();
144+
145+
CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.TABLE);
146+
CryptoService cryptoService = context.getCryptoFactory().getService(env,
147+
context.getConfiguration().getAllCryptoProperties());
148+
149+
for (String filename : opts.files) {
150+
Path path = new Path(filename);
151+
checkFileExists(path, vm);
152+
UnreferencedTabletFile tabletFile =
153+
UnreferencedTabletFile.of(vm.getFileSystemByPath(path), path);
154+
LOG.info("Writing to file '{}'", tabletFile);
155+
FileSKVWriter writer = new RFileOperations().newWriterBuilder()
156+
.forFile(tabletFile, vm.getFileSystemByPath(path), context.getHadoopConf(), cryptoService)
157+
.withTableConfiguration(DefaultConfiguration.getInstance()).withCompression(opts.codec)
158+
.build();
159+
writer.close();
160+
}
161+
}
162+
163+
void createEmptyWal(Opts opts, ServerContext context) throws IOException {
164+
final LogFileValue EMPTY = new LogFileValue();
165+
166+
var vm = context.getVolumeManager();
167+
168+
for (String filename : opts.files) {
169+
Path path = new Path(filename);
170+
checkFileExists(path, vm);
171+
try (var out = new DataOutputStream(vm.create(path))) {
172+
LOG.info("Output file: {}", path);
173+
174+
out.write(LOG_FILE_HEADER_V4.getBytes(UTF_8));
175+
176+
CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL);
177+
CryptoService cryptoService = context.getCryptoFactory().getService(env,
178+
context.getConfiguration().getAllCryptoProperties());
179+
180+
byte[] cryptoParams = cryptoService.getFileEncrypter(env).getDecryptionParameters();
181+
CryptoUtils.writeParams(cryptoParams, out);
182+
183+
LogFileKey key = new LogFileKey();
184+
key.event = OPEN;
185+
key.tserverSession = "";
186+
187+
key.write(out);
188+
EMPTY.write(out);
189+
}
190+
}
191+
}
192+
193+
private void checkFileExists(final Path path, final VolumeManager vm) throws IOException {
194+
if (vm.exists(path)) {
195+
throw new IllegalArgumentException(path + " exists");
196+
}
197+
}
198+
}

0 commit comments

Comments
 (0)