View Source file_log_reader (util v1.1.5)
Periodically read an append-only log file and parse newly added data.
The user controls the interval in msec how often to check for file modifications. When new data is appended to file it triggers invocation of the user-defined parsing function that deliminates the file, and the result is delivered to the consumer by calling the consumer callback function.
The log reader can be started as agen_server
or can be controlled synchronously by using init/3
, run/1
, and close/1
methods.
Link to this section Summary
Types
Details:
- pos
- Start reading from this position (default: 0)
- end_pos
- Read until this position and stop. If provided and file position reaches
end_pos
, the consumer() callback given to the reader will be called as:Consumer({'$end_of_file', Filename::string(), Result}, Pos::integer(), State)
whereResult
isok
or{error|exit|exception, Error::any(), StackTrace}
if an error occured. - max_size
- Maximum chunk size to read from file in a single pass (default: 32M).
- timeout
- Number of milliseconds between successive file scanning (default: 1000)
- retry_sec
- Number of seconds between successive retries upon failure to open the market data file passed to one of the
start*/{3,4}
functions (default: 15). The value of 0 means that the file must exist or else the process won't start. - parser
- Is the function to be called when the next chunk is read from file. The function must return:
{ok, Msg, Tail, State}
- invoke
Consumer
callback passing it the parsed messageMsg
, and continue parsing theTail
binary {incomplete, State}
- the data contains no complete messages - wait until there's more
{skip, Tail, State}
- disregard input and continue parsing
Tail
without callingConsumer
callback
- pstate
- Initial value of the parser state or a functor
fun((File::string() Consumer::consumer(), Options::options()) -> PState::any())
- pstate_update
- Update function of the parser state. Called when the user invokes
update_pstate/3
Functions
Close file processor (use this method when not using gen_server)
When using file processor without gen_server, use this function to initialize the state, and then call run/1.
Report last processed file position/size.
Return current parser state (
{pstate, any()}
initialization option).Process file from given position
Pos
to EndPos
(or eof
).Start the server outside of supervision tree.
Process
File
by calling Consumer
callback on every delimited message. Message delimination is handled by the {parser, Parser}
option. Consumer
function gets called iteratively with the following arguments:- (Msg, Pos::integer(), State)
Msg
is what the parser function returned.Pos
is current file position following theMsg
.State
is current value of parser state that is initialized by{pstate, PState}
option given to thestart_link/{3,4}
function- ({'$end_of_file', Filename::string(), Result}, Pos::integer(), PState)
- This call happens when end of file condition is reached (see definition of
consumer()
type)
Consumer
can end processing normally without reaching the end of file by throwing {eof, PState}
exception.To be called by the supervisor in order to start the server. If init/1 fails with Reason, the function returns
{error,Reason}
. If init/1 returns {stop,Reason}
or ignore, the process is terminated and the function returns {error,Reason}
or ignore, respectively.See also: start_link/3.
Stop the server.
Update parser state.
Link to this section Types
-type consumer() ::
fun((Msg ::
any() |
{'$end_of_file',
string(),
Res :: ok | {error | throw | exit, Reason :: any(), Stacktrace :: list()}},
Pos :: integer(),
State :: any()) ->
NewState :: any()).
-type options() :: [{pos, StartPos :: integer()} | {end_pos, ReadUntilPos :: integer() | eof} | {max_size, MaxReadSize :: integer() | eof} | {timeout, MSec :: integer()} | {retry_sec, Sec :: integer()} | {parser, fun((Data :: binary(), ParserState :: any()) -> {ok, Msg :: any(), Tail :: binary(), NewParserState :: any()} | {incomplete, NewParserState :: any()} | {skip, Tail :: binary(), NewParserState :: any()}) | {Mod :: atom(), Fun :: atom()}} | {pstate, fun((File :: string(), consumer(), Options :: list()) -> any()) | any()} | {pstate_update, fun((Option :: atom(), Value :: any(), PState :: any()) -> {ok, NewPState :: any()} | {error, any()})}].
- pos
- Start reading from this position (default: 0)
- end_pos
- Read until this position and stop. If provided and file position reaches
end_pos
, the consumer() callback given to the reader will be called as:Consumer({'$end_of_file', Filename::string(), Result}, Pos::integer(), State)
whereResult
isok
or{error|exit|exception, Error::any(), StackTrace}
if an error occured. - max_size
- Maximum chunk size to read from file in a single pass (default: 32M).
- timeout
- Number of milliseconds between successive file scanning (default: 1000)
- retry_sec
- Number of seconds between successive retries upon failure to open the market data file passed to one of the
start*/{3,4}
functions (default: 15). The value of 0 means that the file must exist or else the process won't start. - parser
- Is the function to be called when the next chunk is read from file. The function must return:
{ok, Msg, Tail, State}
- invoke
Consumer
callback passing it the parsed messageMsg
, and continue parsing theTail
binary {incomplete, State}
- the data contains no complete messages - wait until there's more
{skip, Tail, State}
- disregard input and continue parsing
Tail
without callingConsumer
callback
- pstate
- Initial value of the parser state or a functor
fun((File::string() Consumer::consumer(), Options::options()) -> PState::any())
- pstate_update
- Update function of the parser state. Called when the user invokes
update_pstate/3
Link to this section Functions
-spec position(pid() | atom()) -> {ok, Position :: integer()}.
-spec pstate(pid() | atom()) -> {ok, any()}.
{pstate, any()}
initialization option).
-spec run(#state{}) -> #state{}.
Pos
to EndPos
(or eof
).
File
by calling Consumer
callback on every delimited message. Message delimination is handled by the {parser, Parser}
option. Consumer
function gets called iteratively with the following arguments:- (Msg, Pos::integer(), State)
Msg
is what the parser function returned.Pos
is current file position following theMsg
.State
is current value of parser state that is initialized by{pstate, PState}
option given to thestart_link/{3,4}
function- ({'$end_of_file', Filename::string(), Result}, Pos::integer(), PState)
- This call happens when end of file condition is reached (see definition of
consumer()
type)
Consumer
can end processing normally without reaching the end of file by throwing {eof, PState}
exception.
{error,Reason}
. If init/1 returns {stop,Reason}
or ignore, the process is terminated and the function returns {error,Reason}
or ignore, respectively.See also: start_link/3.
-spec stop(pid() | atom()) -> ok.
-spec update_pstate(pid(), Option :: atom(), Value :: any()) -> {ok, State :: any()}.