|
| 1 | +## 事务日志——checklist |
| 2 | + |
| 3 | +使用checklist来执行事务日志的记录,checklist以类的形式保存,记录检查点,每执行完一个节点需要做对应的记录。 |
| 4 | + |
| 5 | +checklist的结构如下: |
| 6 | + |
| 7 | +```java |
| 8 | +package org.apache.hugegraph.variables; |
| 9 | + |
| 10 | +import org.apache.hugegraph.config.HugeConfig; |
| 11 | +import org.apache.hugegraph.masterelection.GlobalMasterInfo; |
| 12 | + |
| 13 | +import java.io.Serializable; |
| 14 | + |
| 15 | +public class CheckList implements Serializable { |
| 16 | + |
| 17 | + private String name; |
| 18 | + private String configText; |
| 19 | + |
| 20 | + private HugeConfig config; |
| 21 | + |
| 22 | + private boolean initBackended; |
| 23 | + |
| 24 | + private boolean serverStarted; |
| 25 | + |
| 26 | + private String stage; |
| 27 | + |
| 28 | + private String configPath; |
| 29 | + |
| 30 | + private GlobalMasterInfo nodeInfo; |
| 31 | + |
| 32 | + boolean toCheck; |
| 33 | + String context; |
| 34 | + private boolean isBuild; |
| 35 | + |
| 36 | + public void setBuild(boolean build) { |
| 37 | + isBuild = build; |
| 38 | + } |
| 39 | + |
| 40 | + public CheckList(String name, String context) { |
| 41 | + this.name = name; |
| 42 | + this.context = context; |
| 43 | + } |
| 44 | + |
| 45 | + public HugeConfig getConfig() { |
| 46 | + return config; |
| 47 | + } |
| 48 | + |
| 49 | + public void setConfig(HugeConfig config) { |
| 50 | + this.config = config; |
| 51 | + } |
| 52 | + |
| 53 | + public boolean isInitBackended() { |
| 54 | + return initBackended; |
| 55 | + } |
| 56 | + |
| 57 | + public void setInitBackended(boolean initBackended) { |
| 58 | + this.initBackended = initBackended; |
| 59 | + } |
| 60 | + |
| 61 | + public boolean isServerStarted() { |
| 62 | + return serverStarted; |
| 63 | + } |
| 64 | + |
| 65 | + public void setServerStarted(boolean serverStarted) { |
| 66 | + this.serverStarted = serverStarted; |
| 67 | + } |
| 68 | + |
| 69 | + public String getStage() { |
| 70 | + return stage; |
| 71 | + } |
| 72 | + |
| 73 | + public void setStage(String stage) { |
| 74 | + this.stage = stage; |
| 75 | + } |
| 76 | + |
| 77 | + public String getConfigPath() { |
| 78 | + return configPath; |
| 79 | + } |
| 80 | + |
| 81 | + public void setConfigPath(String configPath) { |
| 82 | + this.configPath = configPath; |
| 83 | + } |
| 84 | + |
| 85 | + public GlobalMasterInfo getNodeInfo() { |
| 86 | + return nodeInfo; |
| 87 | + } |
| 88 | + |
| 89 | + public void setNodeInfo(GlobalMasterInfo nodeInfo) { |
| 90 | + this.nodeInfo = nodeInfo; |
| 91 | + } |
| 92 | + |
| 93 | + public String getName() { |
| 94 | + return name; |
| 95 | + } |
| 96 | + |
| 97 | + public void setName(String name) { |
| 98 | + this.name = name; |
| 99 | + } |
| 100 | +} |
| 101 | + |
| 102 | +``` |
| 103 | + |
| 104 | + |
| 105 | + |
| 106 | +主要涉及到的函数有GraphsAPI类中的create、drop两个函数,以create为例,在原来的流程之上,在检查点处保存log信息: |
| 107 | + |
| 108 | +```java |
| 109 | + public void create(String configPath, GlobalMasterInfo nodeInfo){ |
| 110 | + //CheckList checkList = new CheckList(); |
| 111 | + KResponse result = null; |
| 112 | + try { |
| 113 | + result = client.get(this.name); |
| 114 | + String json = result.getValue(); |
| 115 | + CheckList checkList = JsonUtil.fromJson(json, CheckList.class); |
| 116 | + |
| 117 | + this.initBackend(); |
| 118 | + checkList.setInitBackended(true); |
| 119 | + checkList.setStage("initBackend"); |
| 120 | + client.put(name, JsonUtil.toJson(checkList)); |
| 121 | + this.serverStarted(nodeInfo); |
| 122 | + checkList.setServerStarted(true); |
| 123 | + checkList.setStage("setServerStarted"); |
| 124 | + client.put(name, JsonUtil.toJson(checkList)); |
| 125 | + |
| 126 | + |
| 127 | + // Write config to disk file |
| 128 | + String confPath = ConfigUtil.writeToFile(configPath, this.name(), |
| 129 | + this.configuration()); |
| 130 | + this.configuration.file(confPath); |
| 131 | + } |
| 132 | +``` |
| 133 | + |
| 134 | + |
| 135 | + |
| 136 | +除此之外,还有一些上下文信息需要保存,便于重新恢复: |
| 137 | + |
| 138 | +```java |
| 139 | +checkList.setConfig(config); |
| 140 | +``` |
| 141 | + |
| 142 | + |
| 143 | + |
| 144 | +序列化与日志存放,分别借用了hugegraph中的Jsonutil与pd中的kvclient: |
| 145 | + |
| 146 | +```java |
| 147 | +private KvClient<WatchResponse> client; |
| 148 | +CheckList checkList = JsonUtil.fromJson(json, CheckList.class); |
| 149 | +client.put(name, JsonUtil.toJson(checkList)); |
| 150 | +``` |
| 151 | + |
| 152 | + |
| 153 | + |
| 154 | +恢复机制:在HugeGraphServer中,启动时添加对事务的扫描,扫描的机制为,使用KvClient的前缀扫描,遇到没有执行完的,进行恢复执行。 |
| 155 | + |
| 156 | +```java |
| 157 | +package org.apache.hugegraph; |
| 158 | + |
| 159 | +import org.apache.hugegraph.config.HugeConfig; |
| 160 | +import org.apache.hugegraph.core.GraphManager; |
| 161 | +import org.apache.hugegraph.masterelection.GlobalMasterInfo; |
| 162 | +import org.apache.hugegraph.pd.client.KvClient; |
| 163 | +import org.apache.hugegraph.pd.common.PDException; |
| 164 | +import org.apache.hugegraph.pd.grpc.kv.ScanPrefixResponse; |
| 165 | +import org.apache.hugegraph.pd.grpc.kv.WatchResponse; |
| 166 | +import org.apache.hugegraph.util.ConfigUtil; |
| 167 | +import org.apache.hugegraph.util.Events; |
| 168 | +import org.apache.hugegraph.util.JsonUtil; |
| 169 | +import org.apache.hugegraph.variables.CheckList; |
| 170 | +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; |
| 171 | + |
| 172 | + |
| 173 | + |
| 174 | +public class TxScanner { |
| 175 | + private final String prefix = "graph_creat_tx"; |
| 176 | + |
| 177 | + private KvClient<WatchResponse> client; |
| 178 | + |
| 179 | + public TxScanner(KvClient<WatchResponse> client) { |
| 180 | + } |
| 181 | + |
| 182 | + |
| 183 | + public void scan() { |
| 184 | + try { |
| 185 | + ScanPrefixResponse response = this.client.scanPrefix(prefix); |
| 186 | + for(String key : response.getKvsMap().keySet()) { |
| 187 | + String value = response.getKvsMap().get(key); |
| 188 | + CheckList checkList = JsonUtil.fromJson(value, CheckList.class); |
| 189 | + switch (checkList.getStage()) { |
| 190 | + case "config": { |
| 191 | + configContinue(checkList); |
| 192 | + } |
| 193 | + case "initBackend" : { |
| 194 | + HugeConfig config = checkList.getConfig(); |
| 195 | + HugeGraph graph = (HugeGraph) GraphFactory.open(config); |
| 196 | + GlobalMasterInfo globalMasterInfo = checkList.getNodeInfo(); |
| 197 | + graph.serverStarted(globalMasterInfo); |
| 198 | + // Write config to disk file |
| 199 | + String confPath = ConfigUtil.writeToFile(checkList.getConfigPath(), graph.name(), |
| 200 | + (HugeConfig)graph.configuration()); |
| 201 | + } |
| 202 | + case "setServerStarted" : { |
| 203 | + HugeConfig config = checkList.getConfig(); |
| 204 | + HugeGraph graph = (HugeGraph) GraphFactory.open(config); |
| 205 | + String confPath = ConfigUtil.writeToFile(checkList.getConfigPath(), graph.name(), |
| 206 | + (HugeConfig)graph.configuration()); |
| 207 | + } |
| 208 | + case "finish" : { |
| 209 | + client.delete(prefix + checkList.getName()); |
| 210 | + } |
| 211 | + } |
| 212 | + } |
| 213 | + } catch (PDException e) { |
| 214 | + throw new RuntimeException(e); |
| 215 | + } |
| 216 | + |
| 217 | + } |
| 218 | + |
| 219 | + private void configContinue(CheckList checkList) { |
| 220 | + HugeConfig config = checkList.getConfig(); |
| 221 | + HugeGraph graph = (HugeGraph) GraphFactory.open(config); |
| 222 | + try { |
| 223 | + // Create graph instance |
| 224 | + graph = (HugeGraph) GraphFactory.open(config); |
| 225 | + String configPath = checkList.getConfigPath(); |
| 226 | + GlobalMasterInfo globalMasterInfo = checkList.getNodeInfo(); |
| 227 | + // Init graph and start it |
| 228 | + graph.create(configPath, globalMasterInfo); |
| 229 | + } catch (Throwable e) { |
| 230 | + throw e; |
| 231 | + } |
| 232 | + |
| 233 | + } |
| 234 | + |
| 235 | +} |
| 236 | +``` |
| 237 | + |
| 238 | + |
| 239 | + |
0 commit comments