Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/pgo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
query/2,
query/3,
query/4,
prepare/2,
prepare/3,
query_prepared/3,
query_prepared/4,
transaction/1,
transaction/2,
transaction/3,
Expand Down Expand Up @@ -148,6 +152,89 @@ query(Query, Params, Options, Conn=#conn{trace=TraceDefault,
#{queue_time => undefined})
end).

%% @doc Prepare a named statement on the given pool.
%% Returns {ok, Name, ParameterOIDs} which can be passed to query_prepared/3,4.
-spec prepare(iodata(), iodata()) -> {ok, iodata(), [pg_types:oid()]} | {error, term()}.
prepare(Name, Query) ->
prepare(Name, Query, #{}).

-spec prepare(iodata(), iodata(), options()) -> {ok, iodata(), [pg_types:oid()]} | {error, term()}.
prepare(Name, Query, Options) ->
pgo_prepared_cache:init(),
Pool = maps:get(pool, Options, default),
PoolOptions = maps:get(pool_options, Options, []),
case checkout(Pool, PoolOptions) of
{ok, Ref, Conn} ->
try
case pgo_handler:prepare(Conn, Name, Query) of
{ok, N, OIDs} = Result ->
pgo_prepared_cache:store(N, Query, OIDs),
Result;
Error ->
Error
end
after
checkin(Ref, Conn)
end;
{error, _} = E ->
E
end.

%% @doc Execute a previously prepared named statement.
%% ParameterOIDs is the list returned by prepare/2,3.
-spec query_prepared(iodata(), list(), [pg_types:oid()]) -> result().
query_prepared(Name, Params, ParameterOIDs) ->
query_prepared(Name, Params, ParameterOIDs, #{}).

-spec query_prepared(iodata(), list(), [pg_types:oid()], options()) -> result().
query_prepared(Name, Params, ParameterOIDs, Options) ->
pgo_prepared_cache:init(),
Pool = maps:get(pool, Options, default),
PoolOptions = maps:get(pool_options, Options, []),
DecodeOptions = maps:get(decode_opts, Options, []),
case checkout(Pool, PoolOptions) of
{ok, Ref={_, _, _, Holder}, Conn=#conn{owner=Owner, decode_opts=DefaultDecodeOpts}} ->
try
NameBin = iolist_to_binary(Name),
_ = maybe_prepare_on_conn(Owner, NameBin, Conn),
pgo_handler:prepared_query(Conn, Name, Params, ParameterOIDs,
DecodeOptions ++ DefaultDecodeOpts)
of
{error, closed} ->
maybe_timeout_error(Holder);
{error, einval} ->
maybe_timeout_error(Holder);
Result ->
Result
after
checkin(Ref, Conn)
end;
{error, _} = E ->
E
end.

maybe_prepare_on_conn(Owner, NameBin, Conn) ->
Key = {Owner, NameBin},
case pgo_prepared_cache:is_conn_prepared(Key) of
true ->
ok;
false ->
case pgo_prepared_cache:lookup(NameBin) of
{ok, Query, _OIDs} ->
case pgo_handler:prepare(Conn, NameBin, Query) of
{ok, _, _} ->
pgo_prepared_cache:mark_conn_prepared(Key);
{error, {pgsql_error, #{code := <<"42P05">>}}} ->
%% Already prepared (e.g. from a previous session)
pgo_prepared_cache:mark_conn_prepared(Key);
Error ->
Error
end;
not_found ->
ok
end
end.

%% @equiv transaction(default, Fun, [])
-spec transaction(fun(() -> any())) -> any() | {error, any()}.
transaction(Fun) ->
Expand Down
105 changes: 105 additions & 0 deletions src/pgo_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
extended_query/3,
extended_query/4,
extended_query/5,
prepare/3,
prepared_query/4,
prepared_query/5,
ping/1,
setopts/3,
simple_query/2,
Expand Down Expand Up @@ -78,6 +81,108 @@ extended_query(Socket, Query, Parameters, DecodeOptions, _Timings) ->
DecodeFun = proplists:get_value(decode_fun, DecodeOptions, undefined),
extended_query(Socket, Query, Parameters, DecodeOptions, DecodeFun, []).

%% @doc Parse a named prepared statement. Returns {ok, Name, ParameterOIDs}
%% on success. The statement is cached server-side per connection.
-spec prepare(#conn{}, iodata(), iodata()) -> {ok, iodata(), [pg_types:oid()]} | {error, term()}.
prepare(Conn=#conn{socket=Socket,
socket_module=SocketModule}, Name, Query) ->
_ = setopts(SocketModule, Socket, [{active, false}]),
ParseMessage = pgo_protocol:encode_parse_message(Name, Query, []),
DescribeMessage = pgo_protocol:encode_describe_message(statement, Name),
FlushMessage = pgo_protocol:encode_flush_message(),
SyncMessage = pgo_protocol:encode_sync_message(),
Packet = [ParseMessage, DescribeMessage, FlushMessage, SyncMessage],
Result = case SocketModule:send(Socket, Packet) of
ok ->
prepare_receive_loop(Name, Conn);
{error, _} = SendError ->
SendError
end,
_ = setopts(SocketModule, Socket, [{active, once}]),
Result.

prepare_receive_loop(Name, Conn=#conn{socket=Socket, socket_module=SocketModule}) ->
case receive_message(SocketModule, Socket, Conn, []) of
{ok, #parse_complete{}} ->
prepare_receive_loop_describe(Name, Conn);
{ok, #error_response{fields = Fields}} ->
flush_until_ready_for_query({error, {pgsql_error, Fields}}, Conn);
{error, _} = Error ->
Error
end.

prepare_receive_loop_describe(Name, Conn=#conn{socket=Socket, socket_module=SocketModule}) ->
case receive_message(SocketModule, Socket, Conn, []) of
{ok, #parameter_description{data_types=DataTypes}} ->
prepare_skip_to_ready(Name, DataTypes, Conn);
{ok, #error_response{fields = Fields}} ->
flush_until_ready_for_query({error, {pgsql_error, Fields}}, Conn);
{error, _} = Error ->
Error
end.

prepare_skip_to_ready(Name, DataTypes, Conn=#conn{socket=Socket, socket_module=SocketModule}) ->
case receive_message(SocketModule, Socket, Conn, []) of
{ok, #ready_for_query{}} ->
{ok, Name, DataTypes};
{ok, #error_response{fields = Fields}} ->
flush_until_ready_for_query({error, {pgsql_error, Fields}}, Conn);
{ok, #parameter_description{data_types = DTs}} ->
prepare_skip_to_ready(Name, DTs, Conn);
{ok, _} ->
prepare_skip_to_ready(Name, DataTypes, Conn);
{error, _} = Error ->
Error
end.

%% @doc Execute a previously prepared named statement. Skips PARSE entirely —
%% only sends BIND, DESCRIBE portal, EXECUTE, SYNC. The statement must have
%% been prepared on this connection via prepare/3 first.
-spec prepared_query(#conn{}, iodata(), list(), [pg_types:oid()]) -> pgo:result().
prepared_query(Conn, Name, Parameters, ParameterDataTypes) ->
prepared_query(Conn, Name, Parameters, ParameterDataTypes, []).

-spec prepared_query(#conn{}, iodata(), list(), [pg_types:oid()], pgo:decode_opts()) -> pgo:result().
prepared_query(Conn=#conn{socket=Socket,
socket_module=SocketModule},
Name, Parameters, ParameterDataTypes, DecodeOptions) ->
_ = setopts(SocketModule, Socket, [{active, false}]),
DecodeFun = proplists:get_value(decode_fun, DecodeOptions, undefined),
Result = case encode_bind_describe_execute_named(Conn, Name, Parameters, ParameterDataTypes) of
{ok, SinglePacket} ->
case SocketModule:send(Socket, SinglePacket) of
ok ->
try
receive_loop(bind_complete, DecodeFun, [], DecodeOptions, Conn)
catch
Class:Reason:Stacktrace ->
flush_until_ready_for_query(error, Conn),
erlang:raise(Class, Reason, Stacktrace)
end;
{error, _} = SendError ->
SendError
end;
{_, _} = Error ->
Error
end,
_ = setopts(SocketModule, Socket, [{active, once}]),
Result.

-spec encode_bind_describe_execute_named(pgo_pool:conn(), iodata(), [any()], [pg_types:oid()]) ->
{ok, iodata()} | {term(), any()}.
encode_bind_describe_execute_named(Conn, StatementName, Parameters, ParameterDataTypes) ->
DescribeMessage = pgo_protocol:encode_describe_message(portal, ""),
ExecuteMessage = pgo_protocol:encode_execute_message("", 0),
SyncMessage = pgo_protocol:encode_sync_message(),
try
BindMessage = pgo_protocol:encode_bind_message(Conn, "", StatementName, Parameters, ParameterDataTypes),
SinglePacket = [BindMessage, DescribeMessage, ExecuteMessage, SyncMessage],
{ok, SinglePacket}
catch
Class:Exception ->
{Class, Exception}
end.

-spec ping(#conn{}) -> ok | {error, term()}.
ping(Conn=#conn{socket=Socket,
socket_module=SocketModule}) ->
Expand Down
48 changes: 48 additions & 0 deletions src/pgo_prepared_cache.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
%% @doc ETS-based cache for prepared statement metadata.
%%
%% Stores statement name to {query, parameter OIDs} mappings, and tracks
%% which connections have each statement prepared.
-module(pgo_prepared_cache).

-export([init/0, store/3, lookup/1, is_conn_prepared/1, mark_conn_prepared/1]).

-define(TABLE, pgo_prepared_cache).
-define(CONN_TABLE, pgo_prepared_conn_cache).

%% @doc Initialize cache tables. Safe to call multiple times.
init() ->
init_table(?TABLE),
init_table(?CONN_TABLE).

%% @doc Store a prepared statement's query and parameter OIDs.
-spec store(iodata(), iodata(), [pg_types:oid()]) -> ok.
store(Name, Query, OIDs) ->
ets:insert(?TABLE, {iolist_to_binary(Name), iolist_to_binary(Query), OIDs}),
ok.

%% @doc Look up a prepared statement's query and OIDs by name.
-spec lookup(iodata()) -> {ok, binary(), [pg_types:oid()]} | not_found.
lookup(Name) ->
case ets:lookup(?TABLE, iolist_to_binary(Name)) of
[{_, Query, OIDs}] -> {ok, Query, OIDs};
[] -> not_found
end.

%% @doc Check if a statement has been prepared on a specific connection.
-spec is_conn_prepared({pid(), binary()}) -> boolean().
is_conn_prepared(Key) ->
ets:member(?CONN_TABLE, Key).

%% @doc Mark a statement as prepared on a specific connection.
-spec mark_conn_prepared({pid(), binary()}) -> ok.
mark_conn_prepared(Key) ->
ets:insert(?CONN_TABLE, {Key}),
ok.

init_table(Name) ->
case ets:whereis(Name) of
undefined ->
ets:new(Name, [named_table, public, set, {read_concurrency, true}]);
_ ->
ok
end.
Loading
Loading