-
Notifications
You must be signed in to change notification settings - Fork 709
/
Copy pathapp.py
executable file
·178 lines (144 loc) · 5.25 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python3
# Copyright 2022 Tier IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import uuid
from autoware_static_centerline_generator.srv import LoadMap
from autoware_static_centerline_generator.srv import PlanPath
from autoware_static_centerline_generator.srv import PlanRoute
from flask import Flask
from flask import jsonify
from flask import request
from flask_cors import CORS
import rclpy
from rclpy.node import Node
rclpy.init()
node = Node("static_centerline_generator_http_server")
app = Flask(__name__)
CORS(app)
def create_client(service_type, server_name):
# create client
cli = node.create_client(service_type, server_name)
while not cli.wait_for_service(timeout_sec=1.0):
print("{} service not available, waiting again...".format(server_name))
return cli
@app.route("/map", methods=["POST"])
def get_map():
data = request.get_json()
map_uuid = str(uuid.uuid4())
global map_id
map_id = map_uuid
# create client
cli = create_client(LoadMap, "/planning/static_centerline_generator/load_map")
# request map loading
req = LoadMap.Request(map=data["map"])
future = cli.call_async(req)
# get result
rclpy.spin_until_future_complete(node, future)
res = future.result()
# error handling
if res.message == "InvalidMapFormat":
return jsonify(code=res.message, message="Map format is invalid."), 400
elif res.message != "":
return (
jsonify(
code="InternalServerError",
message="Error occurred on the server. Please check the terminal.",
),
500,
)
return map_uuid
@app.route("/planned_route", methods=["GET"])
def post_planned_route():
args = request.args.to_dict()
global map_id
if map_id != args.get("map_id"):
# TODO(murooka) error handling for map_id mismatch
print("map_id is not correct.")
# create client
cli = create_client(PlanRoute, "/planning/static_centerline_generator/plan_route")
# request route planning
req = PlanRoute.Request(
start_lane_id=int(args.get("start_lane_id")), end_lane_id=int(args.get("end_lane_id"))
)
future = cli.call_async(req)
# get result
rclpy.spin_until_future_complete(node, future)
res = future.result()
# error handling
if res.message == "MapNotFound":
return jsonify(code=res.message, message="Map is missing."), 404
elif res.message == "RouteNotFound":
return jsonify(code=res.message, message="Planning route failed."), 404
elif res.message != "":
return (
jsonify(
code="InternalServerError",
message="Error occurred on the server. Please check the terminal.",
),
500,
)
return json.dumps(tuple(res.lane_ids))
@app.route("/planned_path", methods=["GET"])
def post_planned_path():
args = request.args.to_dict()
global map_id
if map_id != args.get("map_id"):
# TODO(murooka) error handling for map_id mismatch
print("map_id is not correct.")
# create client
cli = create_client(PlanPath, "/planning/static_centerline_generator/plan_path")
# request path planning
route_lane_ids = [eval(i) for i in request.args.getlist("route[]")]
req = PlanPath.Request(route=route_lane_ids)
future = cli.call_async(req)
# get result
rclpy.spin_until_future_complete(node, future)
res = future.result()
# error handling
if res.message == "MapNotFound":
return jsonify(code=res.message, message="Map is missing."), 404
elif res.message == "LaneletsNotConnected":
return (
jsonify(
code=res.message,
message="Lanelets are not connected.",
object_ids=tuple(res.unconnected_lane_ids),
),
400,
)
elif res.message != "":
return (
jsonify(
code="InternalServerError",
message="Error occurred on the server. Please check the terminal.",
),
500,
)
# create output json
result_json = []
for points_with_lane_id in res.points_with_lane_ids:
current_lane_points = []
for geom_point in points_with_lane_id.points:
point = {"x": geom_point.x, "y": geom_point.y, "z": geom_point.z}
current_lane_points.append(point)
current_result_json = {}
current_result_json["lane_id"] = int(points_with_lane_id.lane_id)
current_result_json["points"] = current_lane_points
result_json.append(current_result_json)
return json.dumps(tuple(result_json))
if __name__ == "__main__":
app.debug = True
app.secret_key = "tmp_secret_key"
app.run(host="localhost", port=4010)