Skip to content

Commit

Permalink
Merge pull request #275 from basho/fixes/lrb/allow-transport-choice
Browse files Browse the repository at this point in the history
Add the ability to switch to/from TTB for TS put, get and query requests
  • Loading branch information
Brett Hazen committed Apr 14, 2016
2 parents 219156a + 95b4ee6 commit 49142ee
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 57 deletions.
51 changes: 35 additions & 16 deletions src/riakc_pb_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,13 @@
%% can be overridden by setting the application environment variable
%% of the same name on the `riakc' application, for example:
%% `application:set_env(riakc, ping_timeout, 5000).'
-record(request, {ref :: reference(), msg :: rpb_req(), from, ctx :: ctx(), timeout :: timeout(),
tref :: reference() | undefined }).
-record(request, {ref :: reference(),
msg :: rpb_req(),
from, ctx :: ctx(),
timeout :: timeout(),
tref :: reference() | undefined,
opts :: proplists:proplist()
}).

-type portnum() :: non_neg_integer(). %% The TCP port number of the Riak node's Protocol Buffers interface
-type address() :: string() | atom() | inet:ip_address(). %% The TCP/IP host name or address of the Riak node
Expand Down Expand Up @@ -1948,6 +1953,9 @@ process_response(#request{msg = #rpbgetbucketkeypreflistreq{}},
process_response(#request{msg = #tsputreq{}},
#tsputresp{}, State) ->
{reply, ok, State};
process_response(#request{msg = #tsputreq{}},
tsputresp, State) ->
{reply, ok, State};

process_response(#request{msg = #tsdelreq{}},
tsdelresp, State) ->
Expand All @@ -1972,29 +1980,31 @@ process_response(#request{msg = #tslistkeysreq{}} = Request,
process_response(#request{msg = #tsqueryreq{}},
tsqueryresp, State) ->
{reply, tsqueryresp, State};

process_response(#request{msg = #tsqueryreq{}},
Result = {tsqueryresp, _},
State) ->
{reply, Result, State};

process_response(#request{msg = #tsqueryreq{}},
Result = #tsqueryresp{},
State) ->
{reply, Result, State};
process_response(#request{msg = #tscoveragereq{}},
#tscoverageresp{entries = E}, State) ->
{reply, {ok, E}, State};

process_response(#request{msg = #rpbcoveragereq{}},
#rpbcoverageresp{entries = E}, State) ->
{reply, {ok, E}, State};

process_response(#request{msg = #tsgetreq{}},
tsgetresp, State) ->
{reply, tsgetresp, State};

process_response(#request{msg = #tsgetreq{}},
Result = {tsgetresp, _},
State) ->
{reply, Result, State};

process_response(#request{msg = #tsgetreq{}},
Result = #tsgetresp{},
State) ->
{reply, Result, State};
process_response(Request, Reply, State) ->
%% Unknown request/response combo
{reply, {error, {unknown_response, Request, Reply}}, State}.
Expand Down Expand Up @@ -2090,14 +2100,22 @@ send_mapred_req(Pid, MapRed, ClientPid) ->

%% @private
%% Make a new request that can be sent or queued
new_request({Msg, {msgopts, Options}}, From, Timeout) ->
Ref = make_ref(),
#request{ref = Ref,
msg = Msg,
from = From,
timeout = Timeout,
tref = create_req_timer(Timeout, Ref),
opts = Options};
new_request(Msg, From, Timeout) ->
Ref = make_ref(),
#request{ref = Ref, msg = Msg, from = From, timeout = Timeout,
tref = create_req_timer(Timeout, Ref)}.
tref = create_req_timer(Timeout, Ref), opts = []}.
new_request(Msg, From, Timeout, Context) ->
Ref = make_ref(),
#request{ref =Ref, msg = Msg, from = From, ctx = Context, timeout = Timeout,
tref = create_req_timer(Timeout, Ref)}.
tref = create_req_timer(Timeout, Ref), opts = []}.

%% @private
%% Create a request timer if desired, otherwise return undefined.
Expand Down Expand Up @@ -2266,22 +2284,23 @@ send_request(Request0, State) when State#state.active =:= undefined ->
end.

%% @private
encode(Msg=#tsputreq{}) ->
encode(Msg=#tsputreq{}, true) ->
riak_ttb_codec:encode(Msg);
encode(Msg=#tsgetreq{}) ->
encode(Msg=#tsgetreq{}, true) ->
riak_ttb_codec:encode(Msg);
encode(Msg=#tsqueryreq{}) ->
encode(Msg=#tsqueryreq{}, true) ->
riak_ttb_codec:encode(Msg);
encode(Msg) ->
encode(Msg, _UseTTB) ->
riak_pb_codec:encode(Msg).

%% Already encoded (for tunneled messages), but must provide Message Id
%% for responding to the second form of send_request.
encode_request_message(#request{msg={tunneled,MsgId,Pkt}}=Req) ->
{Req#request{msg={tunneled,MsgId}},[MsgId|Pkt]};
%% Unencoded Request (the normal PB client path)
encode_request_message(#request{msg=Msg}=Req) ->
{Req, encode(Msg)}.
encode_request_message(#request{msg=Msg,opts=Opts}=Req) ->
UseTTB = proplists:get_value(use_ttb, Opts, true),
{Req, encode(Msg, UseTTB)}.

%% If the socket was closed, see if we can enqueue the request and
%% trigger a reconnect. Otherwise, return an error to the requestor.
Expand Down
88 changes: 52 additions & 36 deletions src/riakc_ts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

-module(riakc_ts).

-export([query/2, query/3, query/4,
-export([query/2, query/3, query/4, query/5,
get_coverage/3,
put/3, put/4,
get/4,
Expand All @@ -44,33 +44,48 @@

-spec query(pid(), Query::string()|binary()) ->
{ColumnNames::[ts_columnname()], Rows::[tuple()]} | {error, Reason::term()}.
%% @equiv query/4.
query(Pid, QueryText) ->
query(Pid, QueryText, [], undefined).
%% @equiv query/5.
query(Pid, Query) ->
query(Pid, Query, [], undefined, []).

-spec query(pid(), Query::string()|binary(), Interpolations::[{binary(), binary()}]) ->
{ColumnNames::[binary()], Rows::[tuple()]} | {error, term()}.
%% @equiv query/4.
query(Pid, QueryText, Interpolations) ->
query(Pid, QueryText, Interpolations, undefined).

-spec query(pid(), Query::string(), Interpolations::[{binary(), binary()}], Cover::term()) ->
{ColumnNames::[binary()], Rows::[tuple()]} | {error, term()}.
%% @equiv query/5.
query(Pid, Query, Interpolations) ->
query(Pid, Query, Interpolations, undefined, []).

-spec query(Pid::pid(),
Query::string(),
Interpolations::[{binary(), binary()}],
Cover::term()) ->
{ColumnNames::[binary()], Rows::[tuple()]} | {error, term()}.
%% @equiv query/5.
query(Pid, Query, Interpolations, Cover) ->
query(Pid, Query, Interpolations, Cover, []).

-spec query(Pid::pid(),
Query::string(),
Interpolations::[{binary(), binary()}],
Cover::term(),
Options::proplists:proplist()) ->
{ColumnNames::[binary()], Rows::[tuple()]} | {error, term()}.
%% @doc Execute a Query with client. The result returned
%% is a tuple containing a list of columns as binaries in the
%% first element, and a list of records, each represented as a
%% list of values, in the second element, or an @{error, Reason@}
%% tuple.
query(Pid, Query, Interpolations, undefined) ->
query_common(Pid, Query, Interpolations, undefined);
query(Pid, Query, Interpolations, Cover) when is_binary(Cover) ->
query_common(Pid, Query, Interpolations, Cover).
query(Pid, Query, Interpolations, undefined, Options) ->
query_common(Pid, Query, Interpolations, undefined, Options);
query(Pid, Query, Interpolations, Cover, Options) when is_binary(Cover) ->
query_common(Pid, Query, Interpolations, Cover, Options).

query_common(Pid, Query, Interpolations, Cover)
query_common(Pid, Query, Interpolations, Cover, Options)
when is_pid(Pid), is_list(Query) ->
Message = riakc_ts_query_operator:serialize(
Msg0 = riakc_ts_query_operator:serialize(
iolist_to_binary(Query), Interpolations),
Response = server_call(Pid, Message#tsqueryreq{cover_context = Cover}),
Msg1 = Msg0#tsqueryreq{cover_context = Cover},
Msg = {Msg1, {msgopts, Options}},
Response = server_call(Pid, Msg),
riakc_ts_query_operator:deserialize(Response).


Expand All @@ -85,14 +100,17 @@ get_coverage(Pid, Table, QueryText) ->
table = iolist_to_binary(Table)}).


-spec put(pid(), table_name(), [[ts_value()]]) ->
ok | {error, Reason::term()}.
-spec put(pid(),
table_name(),
[[ts_value()]]) -> ok | {error, Reason::term()}.
%% @equiv put/4.
put(Pid, Table, Measurements) ->
put(Pid, Table, [], Measurements).
put(Pid, Table, Measurements, []).

-spec put(pid(), table_name(), ColumnNames::[ts_columnname()], Data::[[ts_value()]]) ->
ok | {error, Reason::term()}.
-spec put(pid(),
table_name(),
[[ts_value()]],
Options::proplists:proplist()) -> ok | {error, Reason::term()}.
%% @doc Make data records from Data and insert them, individually,
%% into a time-series Table, using client Pid. Each record is a
%% list of values of appropriate types for the complete set of
Expand All @@ -102,11 +120,13 @@ put(Pid, Table, Measurements) ->
%%
%% As of 2015-11-05, ColumnNames parameter is ignored, the function
%% expects the full set of fields in each element of Data.
put(Pid, Table, ColumnNames, Measurements)
put(Pid, Table, Measurements, Options)
when is_pid(Pid) andalso (is_binary(Table) orelse is_list(Table)) andalso
is_list(ColumnNames) andalso is_list(Measurements) ->
Message = riakc_ts_put_operator:serialize(Table, ColumnNames, Measurements),
Response = server_call(Pid, Message),
is_list(Measurements) ->
UseTTB = proplists:get_value(use_ttb, Options, true),
Message = riakc_ts_put_operator:serialize(Table, Measurements, UseTTB),
Msg = {Message, {msgopts, Options}},
Response = server_call(Pid, Msg),
riakc_ts_put_operator:deserialize(Response).


Expand Down Expand Up @@ -145,16 +165,12 @@ delete(Pid, Table, Key, Options)
get(Pid, Table, Key, Options)
when is_pid(Pid), (is_binary(Table) orelse is_list(Table)),
is_list(Key), is_list(Options) ->
Message = #tsgetreq{table = iolist_to_binary(Table),
key = Key,
timeout = proplists:get_value(timeout, Options)},

case server_call(Pid, Message) of
{error, OtherError} ->
{error, OtherError};
{tsgetresp, {ColumnNames, _ColumnTypes, Rows}} ->
{ok, {ColumnNames, Rows}}
end.
UseTTB = proplists:get_value(use_ttb, Options, true),
Msg0 = riakc_ts_get_operator:serialize(Table, Key, UseTTB),
Msg1 = Msg0#tsgetreq{timeout = proplists:get_value(timeout, Options)},
Msg = {Msg1, {msgopts, Options}},
Response = server_call(Pid, Msg),
riakc_ts_get_operator:deserialize(Response).


-spec stream_list_keys(pid(), table_name(), proplists:proplist()) ->
Expand Down
54 changes: 54 additions & 0 deletions src/riakc_ts_get_operator.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
%% -------------------------------------------------------------------
%%
%% riakc_ts_get_operator.erl: helper functions for get requests to Riak TS
%%
%% Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% @doc Helper functions for put requests to Riak TS

-module(riakc_ts_get_operator).

-include_lib("riak_pb/include/riak_pb.hrl").
-include_lib("riak_pb/include/riak_ts_ttb.hrl").
-include_lib("riak_pb/include/riak_ts_pb.hrl").

-export([serialize/3,
deserialize/1]).


serialize(Table, Key, true) ->
#tsgetreq{table = iolist_to_binary(Table),
key = Key};
serialize(Table, Key, false) ->
SerializedKey = riak_pb_ts_codec:encode_cells_non_strict(Key),
#tsgetreq{table = iolist_to_binary(Table),
key = SerializedKey}.

deserialize({error, {Code, Message}}) when is_integer(Code), is_list(Message) ->
{error, {Code, iolist_to_binary(Message)}};
deserialize({error, {Code, Message}}) when is_integer(Code), is_atom(Message) ->
{error, {Code, iolist_to_binary(atom_to_list(Message))}};
deserialize({error, Message}) ->
{error, Message};
deserialize({tsgetresp, {ColumnNames, _ColumnTypes, Rows}}) ->
{ok, {ColumnNames, Rows}};
deserialize(#tsgetresp{columns = C, rows = R}) ->
ColumnNames = [CName || #tscolumndescription{name = CName} <- C],
Rows = riak_pb_ts_codec:decode_rows(R),
{ok, {ColumnNames, Rows}}.
14 changes: 10 additions & 4 deletions src/riakc_ts_put_operator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@
deserialize/1]).


serialize(TableName, ColumnNames, Measurements) ->
ColumnDescs = riak_pb_ts_codec:encode_columnnames(ColumnNames),
%% As of 2015-11-05, columns parameter is ignored, Riak TS
%% expects the full set of fields in each element of Data.
serialize(TableName, Measurements, true) ->
#tsputreq{table = iolist_to_binary(TableName),
columns = ColumnDescs,
rows = Measurements}.
columns = [],
rows = Measurements};
serialize(TableName, Measurements, false) ->
SerializedRows = riak_pb_ts_codec:encode_rows_non_strict(Measurements),
#tsputreq{table = TableName,
columns = [],
rows = SerializedRows}.

deserialize({error, {Code, Message}}) when is_integer(Code), is_list(Message) ->
{error, {Code, iolist_to_binary(Message)}};
Expand Down
5 changes: 4 additions & 1 deletion src/riakc_ts_query_operator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ deserialize({error, {Code, Message}}) when is_integer(Code), is_atom(Message) ->
{error, {Code, iolist_to_binary(atom_to_list(Message))}};
deserialize({error, Message}) ->
{error, Message};

deserialize(tsqueryresp) ->
{[], []};
deserialize({tsqueryresp, {_, _, []}}) ->
{[], []};
deserialize({tsqueryresp, {ColumnNames, _ColumnTypes, Rows}}) ->
{ColumnNames, Rows};
deserialize(#tsqueryresp{columns = C, rows = R}) ->
ColumnNames = [ColName || #tscolumndescription{name = ColName} <- C],
Rows = riak_pb_ts_codec:decode_rows(R),
{ColumnNames, Rows}.

0 comments on commit 49142ee

Please sign in to comment.