refactor(io): make rstream use a linear buffer

If you like it you shouldn't put a ring on it.

This is what _every_ consumer of RStream used anyway, either by calling
rbuffer_reset, or rbuffer_consumed_compact (same as rbuffer_reset
without needing a scratch buffer), or by consuming everything in
each stream_read_cb call directly.
This commit is contained in:
bfredl 2024-05-31 14:40:53 +02:00
parent 4881211097
commit 78d21593a3
18 changed files with 240 additions and 973 deletions

View File

@ -39,8 +39,6 @@
#include "nvim/os/os_defs.h"
#include "nvim/os/shell.h"
#include "nvim/path.h"
#include "nvim/rbuffer.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/terminal.h"
#include "nvim/types_defs.h"
@ -432,7 +430,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
wstream_init(&proc->in, 0);
}
if (has_out) {
rstream_init(&proc->out, 0);
rstream_init(&proc->out);
}
if (rpc) {
@ -447,7 +445,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
if (has_err) {
callback_reader_start(&chan->on_stderr, "stderr");
rstream_init(&proc->err, 0);
rstream_init(&proc->err);
rstream_start(&proc->err, on_job_stderr, chan);
}
@ -484,7 +482,7 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader
channel->stream.socket.s.internal_close_cb = close_cb;
channel->stream.socket.s.internal_data = channel;
wstream_init(&channel->stream.socket.s, 0);
rstream_init(&channel->stream.socket, 0);
rstream_init(&channel->stream.socket);
if (rpc) {
rpc_start(channel);
@ -509,7 +507,7 @@ void channel_from_connection(SocketWatcher *watcher)
channel->stream.socket.s.internal_close_cb = close_cb;
channel->stream.socket.s.internal_data = channel;
wstream_init(&channel->stream.socket.s, 0);
rstream_init(&channel->stream.socket, 0);
rstream_init(&channel->stream.socket);
rpc_start(channel);
channel_create_event(channel, watcher->addr);
}
@ -554,7 +552,7 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **err
dup2(STDERR_FILENO, STDIN_FILENO);
}
#endif
rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd, 0);
rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd);
wstream_init_fd(&main_loop, &channel->stream.stdio.out, stdout_dup_fd, 0);
if (rpc) {
@ -648,51 +646,38 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
return l;
}
void on_channel_data(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
size_t on_channel_data(RStream *stream, const char *buf, size_t count, void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, eof, &chan->on_data);
return on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
}
void on_job_stderr(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
size_t on_job_stderr(RStream *stream, const char *buf, size_t count, void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, eof, &chan->on_stderr);
return on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
}
static void on_channel_output(RStream *stream, Channel *chan, RBuffer *buf, bool eof,
CallbackReader *reader)
static size_t on_channel_output(RStream *stream, Channel *chan, const char *buf, size_t count,
bool eof, CallbackReader *reader)
{
size_t count;
char *output = rbuffer_read_ptr(buf, &count);
if (chan->term) {
if (!eof) {
char *p = output;
char *end = output + count;
if (count) {
const char *p = buf;
const char *end = buf + count;
while (p < end) {
// Don't pass incomplete UTF-8 sequences to libvterm. #16245
// Composing chars can be passed separately, so utf_ptr2len_len() is enough.
int clen = utf_ptr2len_len(p, (int)(end - p));
if (clen > end - p) {
count = (size_t)(p - output);
count = (size_t)(p - buf);
break;
}
p += clen;
}
}
terminal_receive(chan->term, output, count);
}
if (count) {
rbuffer_consumed(buf, count);
}
// Move remaining data to start of buffer, so the buffer can never wrap around.
rbuffer_reset(buf);
if (callback_reader_set(*reader)) {
ga_concat_len(&reader->buffer, output, count);
terminal_receive(chan->term, buf, count);
}
if (eof) {
@ -700,8 +685,11 @@ static void on_channel_output(RStream *stream, Channel *chan, RBuffer *buf, bool
}
if (callback_reader_set(*reader)) {
ga_concat_len(&reader->buffer, buf, count);
schedule_channel_event(chan);
}
return count;
}
/// schedule the necessary callbacks to be invoked as a deferred event

View File

@ -6,7 +6,6 @@
#include <uv.h>
#include "nvim/eval/typval_defs.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
enum { EVENT_HANDLER_MAX_ARGC = 10, };
@ -59,11 +58,13 @@ typedef struct rstream RStream;
/// Type of function called when the RStream buffer is filled with data
///
/// @param stream The Stream instance
/// @param buf The associated RBuffer instance
/// @param read_data data that was read
/// @param count Number of bytes that was read.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
typedef void (*stream_read_cb)(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof);
/// @return number of bytes which were consumed
typedef size_t (*stream_read_cb)(RStream *stream, const char *read_data, size_t count, void *data,
bool eof);
/// Type of function called when the Stream has information about a write
/// request.
@ -102,11 +103,16 @@ struct stream {
struct rstream {
Stream s;
bool did_eof;
RBuffer *buffer;
bool want_read;
bool pending_read;
bool paused_full;
char *buffer; // ARENA_BLOCK_SIZE
char *read_pos;
char *write_pos;
uv_buf_t uvbuf;
stream_read_cb read_cb;
size_t num_bytes;
size_t fpos;
int64_t fpos;
};
#define ADDRESS_MAX_SIZE 256

View File

@ -18,7 +18,6 @@
#include "nvim/os/pty_process.h"
#include "nvim/os/shell.h"
#include "nvim/os/time.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/ui_client.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
@ -355,7 +354,7 @@ static void flush_stream(Process *proc, RStream *stream)
int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe,
&system_buffer_size);
if (err) {
system_buffer_size = (int)rbuffer_capacity(stream->buffer);
system_buffer_size = ARENA_BLOCK_SIZE;
}
size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size;

View File

@ -11,38 +11,44 @@
#include "nvim/macros_defs.h"
#include "nvim/main.h"
#include "nvim/os/os_defs.h"
#include "nvim/rbuffer.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/rstream.c.generated.h"
#endif
void rstream_init_fd(Loop *loop, RStream *stream, int fd, size_t bufsize)
void rstream_init_fd(Loop *loop, RStream *stream, int fd)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream_init(loop, &stream->s, fd, NULL);
rstream_init(stream, bufsize);
rstream_init(stream);
}
void rstream_init_stream(RStream *stream, uv_stream_t *uvstream, size_t bufsize)
void rstream_init_stream(RStream *stream, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream_init(NULL, &stream->s, -1, uvstream);
rstream_init(stream, bufsize);
rstream_init(stream);
}
void rstream_init(RStream *stream, size_t bufsize)
void rstream_init(RStream *stream)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->fpos = 0;
stream->read_cb = NULL;
stream->num_bytes = 0;
stream->buffer = rbuffer_new(bufsize);
stream->buffer->data = stream;
stream->buffer->full_cb = on_rbuffer_full;
stream->buffer->nonfull_cb = on_rbuffer_nonfull;
stream->buffer = alloc_block();
stream->read_pos = stream->write_pos = stream->buffer;
}
void rstream_start_inner(RStream *stream)
FUNC_ATTR_NONNULL_ARG(1)
{
if (stream->s.uvstream) {
uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
} else {
uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
}
}
/// Starts watching for events from a `Stream` instance.
@ -53,17 +59,16 @@ void rstream_start(RStream *stream, stream_read_cb cb, void *data)
{
stream->read_cb = cb;
stream->s.cb_data = data;
if (stream->s.uvstream) {
uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
} else {
uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
stream->want_read = true;
if (!stream->paused_full) {
rstream_start_inner(stream);
}
}
/// Stops watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
void rstream_stop(RStream *stream)
void rstream_stop_inner(RStream *stream)
FUNC_ATTR_NONNULL_ALL
{
if (stream->s.uvstream) {
@ -73,16 +78,14 @@ void rstream_stop(RStream *stream)
}
}
static void on_rbuffer_full(RBuffer *buf, void *data)
/// Stops watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
void rstream_stop(RStream *stream)
FUNC_ATTR_NONNULL_ALL
{
rstream_stop(data);
}
static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{
RStream *stream = data;
assert(stream->read_cb);
rstream_start(stream, stream->read_cb, stream->s.cb_data);
rstream_stop_inner(stream);
stream->want_read = false;
}
// Callbacks used by libuv
@ -91,10 +94,9 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
RStream *stream = handle->data;
// `uv_buf_t.len` happens to have different size on Windows.
size_t write_count;
buf->base = rbuffer_write_ptr(stream->buffer, &write_count);
buf->len = UV_BUF_LEN(write_count);
buf->base = stream->write_pos;
// `uv_buf_t.len` happens to have different size on Windows (as a treat)
buf->len = UV_BUF_LEN(rstream_space(stream));
}
/// Callback invoked by libuv after it copies the data into the buffer provided
@ -108,21 +110,21 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
//
// We don't need to do anything with the RBuffer because the next call
// We don't need to do anything with the buffer because the next call
// to `alloc_cb` will return the same unused pointer (`rbuffer_produced`
// won't be called)
if (cnt == UV_ENOBUFS || cnt == 0) {
return;
} else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
// The TTY driver might signal EOF without closing the stream
invoke_read_cb(stream, 0, true);
invoke_read_cb(stream, true);
} else {
DLOG("closing Stream (%p): %s (%s)", (void *)stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(uvstream);
invoke_read_cb(stream, 0, true);
invoke_read_cb(stream, true);
}
return;
}
@ -130,10 +132,13 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t)cnt;
stream->num_bytes += nread;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(stream->buffer, nread);
invoke_read_cb(stream, nread, false);
stream->write_pos += cnt;
invoke_read_cb(stream, false);
}
static size_t rstream_space(RStream *stream)
{
return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos);
}
/// Called by the by the 'idle' handle to emulate a reading event
@ -146,52 +151,37 @@ static void fread_idle_cb(uv_idle_t *handle)
uv_fs_t req;
RStream *stream = handle->data;
stream->uvbuf.base = stream->write_pos;
// `uv_buf_t.len` happens to have different size on Windows.
size_t write_count;
stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count);
stream->uvbuf.len = UV_BUF_LEN(write_count);
// the offset argument to uv_fs_read is int64_t, could someone really try
// to read more than 9 quintillion (9e18) bytes?
// upcast is meant to avoid tautological condition warning on 32 bits
uintmax_t fpos_intmax = stream->fpos;
if (fpos_intmax > INT64_MAX) {
ELOG("stream offset overflow");
preserve_exit("stream offset overflow");
}
stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream));
// Synchronous read
uv_fs_read(handle->loop,
&req,
stream->s.fd,
&stream->uvbuf,
1,
(int64_t)stream->fpos,
NULL);
uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->fpos, NULL);
uv_fs_req_cleanup(&req);
if (req.result <= 0) {
uv_idle_stop(&stream->s.uv.idle);
invoke_read_cb(stream, 0, true);
invoke_read_cb(stream, true);
return;
}
// no errors (req.result (ssize_t) is positive), it's safe to cast.
size_t nread = (size_t)req.result;
rbuffer_produced(stream->buffer, nread);
stream->fpos += nread;
invoke_read_cb(stream, nread, false);
// no errors (req.result (ssize_t) is positive), it's safe to use.
stream->write_pos += req.result;
stream->fpos += req.result;
invoke_read_cb(stream, false);
}
static void read_event(void **argv)
{
RStream *stream = argv[0];
stream->pending_read = false;
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
stream->did_eof = eof;
stream->read_cb(stream, stream->buffer, count, stream->s.cb_data, eof);
size_t available = rstream_available(stream);
size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data,
stream->did_eof);
assert(consumed <= available);
rstream_consume(stream, consumed);
}
stream->s.pending_reqs--;
if (stream->s.closed && !stream->s.pending_reqs) {
@ -199,13 +189,48 @@ static void read_event(void **argv)
}
}
static void invoke_read_cb(RStream *stream, size_t count, bool eof)
size_t rstream_available(RStream *stream)
{
return (size_t)(stream->write_pos - stream->read_pos);
}
void rstream_consume(RStream *stream, size_t consumed)
{
stream->read_pos += consumed;
size_t remaining = (size_t)(stream->write_pos - stream->read_pos);
if (remaining > 0 && stream->read_pos > stream->buffer) {
memmove(stream->buffer, stream->read_pos, remaining);
stream->read_pos = stream->buffer;
stream->write_pos = stream->buffer + remaining;
} else if (remaining == 0) {
stream->read_pos = stream->write_pos = stream->buffer;
}
if (stream->want_read && stream->paused_full && rstream_space(stream)) {
assert(stream->read_cb);
stream->paused_full = false;
rstream_start_inner(stream);
}
}
static void invoke_read_cb(RStream *stream, bool eof)
{
stream->did_eof |= eof;
if (!rstream_space(stream)) {
rstream_stop_inner(stream);
stream->paused_full = true;
}
// we cannot use pending_reqs as a socket can have both pending reads and writes
if (stream->pending_read) {
return;
}
// Don't let the stream be closed before the event is processed.
stream->s.pending_reqs++;
CREATE_EVENT(stream->s.events, read_event,
stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof);
stream->pending_read = true;
CREATE_EVENT(stream->s.events, read_event, stream);
}
void rstream_may_close(RStream *stream)

View File

@ -8,7 +8,6 @@
#include "nvim/event/loop.h"
#include "nvim/event/stream.h"
#include "nvim/log.h"
#include "nvim/rbuffer.h"
#include "nvim/types_defs.h"
#ifdef MSWIN
# include "nvim/os/os_win_console.h"
@ -149,7 +148,7 @@ static void rstream_close_cb(uv_handle_t *handle)
{
RStream *stream = handle->data;
if (stream->buffer) {
rbuffer_free(stream->buffer);
free_block(stream->buffer);
}
close_cb(handle);
}

View File

@ -31,8 +31,6 @@
#include "nvim/msgpack_rpc/packer.h"
#include "nvim/msgpack_rpc/unpacker.h"
#include "nvim/os/input.h"
#include "nvim/rbuffer.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
#include "nvim/ui.h"
#include "nvim/ui_client.h"
@ -202,10 +200,25 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
return frame.errored ? NIL : frame.result;
}
static void receive_msgpack(RStream *stream, RBuffer *rbuf, size_t c, void *data, bool eof)
static size_t receive_msgpack(RStream *stream, const char *rbuf, size_t c, void *data, bool eof)
{
Channel *channel = data;
channel_incref(channel);
size_t consumed = 0;
DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
channel->id, c, (void *)stream);
if (c > 0) {
Unpacker *p = channel->rpc.unpacker;
p->read_ptr = rbuf;
p->read_size = c;
parse_msgpack(channel);
if (!unpacker_closed(p)) {
consumed = c - p->read_size;
}
}
if (eof) {
channel_close(channel->id, kChannelPartRpc, NULL);
@ -213,25 +226,10 @@ static void receive_msgpack(RStream *stream, RBuffer *rbuf, size_t c, void *data
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
chan_close_with_error(channel, buf, LOGLVL_INF);
goto end;
}
DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
channel->id, rbuffer_size(rbuf), (void *)stream);
Unpacker *p = channel->rpc.unpacker;
size_t size = 0;
p->read_ptr = rbuffer_read_ptr(rbuf, &size);
p->read_size = size;
parse_msgpack(channel);
if (!unpacker_closed(p)) {
size_t consumed = size - p->read_size;
rbuffer_consumed_compact(rbuf, consumed);
}
end:
channel_decref(channel);
return consumed;
}
static ChannelCallFrame *find_call_frame(RpcState *rpc, uint32_t request_id)

View File

@ -21,8 +21,6 @@
#include "nvim/os/fileio.h"
#include "nvim/os/fs.h"
#include "nvim/os/os_defs.h"
#include "nvim/rbuffer.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
#ifdef HAVE_SYS_UIO_H

View File

@ -4,7 +4,6 @@
#include <stdint.h>
#include "nvim/func_attr.h"
#include "nvim/rbuffer_defs.h"
/// Structure used to read from/write to file
typedef struct {

View File

@ -27,8 +27,6 @@
#include "nvim/os/os_defs.h"
#include "nvim/os/time.h"
#include "nvim/profile.h"
#include "nvim/rbuffer.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/state.h"
#include "nvim/state_defs.h"
@ -62,7 +60,7 @@ void input_start(void)
}
used_stdin = true;
rstream_init_fd(&main_loop, &read_stream, STDIN_FILENO, READ_BUFFER_SIZE);
rstream_init_fd(&main_loop, &read_stream, STDIN_FILENO);
rstream_start(&read_stream, input_read_cb, NULL);
}
@ -157,7 +155,7 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt, MultiQueue *e
if (maxlen && input_available()) {
restart_cursorhold_wait(tb_change_cnt);
// Safe to convert rbuffer_read to int, it will never overflow since
// Safe to convert `to_read` to int, it will never overflow since
// INPUT_BUFFER_SIZE fits in an int
size_t to_read = MIN((size_t)maxlen, input_available());
memcpy(buf, input_read_pos, to_read);
@ -497,17 +495,15 @@ static InbufPollResult inbuf_poll(int ms, MultiQueue *events)
return input_eof ? kInputEof : kInputNone;
}
static void input_read_cb(RStream *stream, RBuffer *buf, size_t c, void *data, bool at_eof)
static size_t input_read_cb(RStream *stream, const char *buf, size_t c, void *data, bool at_eof)
{
if (at_eof) {
input_eof = true;
}
assert(input_space() >= rbuffer_size(buf));
RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
input_enqueue_raw(ptr, len);
rbuffer_consumed(buf, len);
}
assert(input_space() >= c);
input_enqueue_raw(buf, c);
return c;
}
static void process_ctrl_c(void)

View File

@ -40,8 +40,6 @@
#include "nvim/path.h"
#include "nvim/pos_defs.h"
#include "nvim/profile.h"
#include "nvim/rbuffer.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/state_defs.h"
#include "nvim/strings.h"
#include "nvim/tag.h"
@ -907,9 +905,9 @@ static int do_os_system(char **argv, const char *input, size_t len, char **outpu
if (has_input) {
wstream_init(&proc->in, 0);
}
rstream_init(&proc->out, 0);
rstream_init(&proc->out);
rstream_start(&proc->out, data_cb, &buf);
rstream_init(&proc->err, 0);
rstream_init(&proc->err);
rstream_start(&proc->err, data_cb, &buf);
// write the input, if any
@ -988,14 +986,14 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
buf->data = xrealloc(buf->data, buf->cap);
}
static void system_data_cb(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
static size_t system_data_cb(RStream *stream, const char *buf, size_t count, void *data, bool eof)
{
DynamicBuffer *dbuf = data;
size_t nread = buf->size;
dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1);
rbuffer_read(buf, dbuf->data + dbuf->len, nread);
dbuf->len += nread;
dynamic_buffer_ensure(dbuf, dbuf->len + count + 1);
memcpy(dbuf->data + dbuf->len, buf, count);
dbuf->len += count;
return count;
}
/// Tracks output received for the current executing shell command, and displays
@ -1078,7 +1076,7 @@ static bool out_data_decide_throttle(size_t size)
///
/// @param output Data to save, or NULL to invoke a special mode.
/// @param size Length of `output`.
static void out_data_ring(char *output, size_t size)
static void out_data_ring(const char *output, size_t size)
{
#define MAX_CHUNK_SIZE (OUT_DATA_THRESHOLD / 2)
static char last_skipped[MAX_CHUNK_SIZE]; // Saved output.
@ -1120,11 +1118,11 @@ static void out_data_ring(char *output, size_t size)
/// @param output Data to append to screen lines.
/// @param count Size of data.
/// @param eof If true, there will be no more data output.
static void out_data_append_to_screen(char *output, size_t *count, bool eof)
static void out_data_append_to_screen(const char *output, size_t *count, bool eof)
FUNC_ATTR_NONNULL_ALL
{
char *p = output;
char *end = output + *count;
const char *p = output;
const char *end = output + *count;
while (p < end) {
if (*p == '\n' || *p == '\r' || *p == TAB || *p == BELL) {
msg_putchar_attr((uint8_t)(*p), 0);
@ -1152,25 +1150,16 @@ end:
ui_flush();
}
static void out_data_cb(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
static size_t out_data_cb(RStream *stream, const char *ptr, size_t count, void *data, bool eof)
{
size_t cnt;
char *ptr = rbuffer_read_ptr(buf, &cnt);
if (ptr != NULL && cnt > 0
&& out_data_decide_throttle(cnt)) { // Skip output above a threshold.
if (count > 0 && out_data_decide_throttle(count)) { // Skip output above a threshold.
// Save the skipped output. If it is the final chunk, we display it later.
out_data_ring(ptr, cnt);
} else if (ptr != NULL) {
out_data_append_to_screen(ptr, &cnt, eof);
out_data_ring(ptr, count);
} else if (count > 0) {
out_data_append_to_screen(ptr, &count, eof);
}
if (cnt) {
rbuffer_consumed(buf, cnt);
}
// Move remaining data to start of buffer, so the buffer can never wrap around.
rbuffer_reset(buf);
return count;
}
/// Parses a command string into a sequence of words, taking quotes into

View File

@ -1,230 +0,0 @@
#include <assert.h>
#include <stdbool.h>
#include <stddef.h>
#include <string.h>
#include "nvim/macros_defs.h"
#include "nvim/memory.h"
#include "nvim/rbuffer.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "rbuffer.c.generated.h"
#endif
/// Creates a new `RBuffer` instance.
RBuffer *rbuffer_new(size_t capacity)
FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_RET
{
if (!capacity) {
capacity = 0x10000;
}
RBuffer *rv = xcalloc(1, sizeof(RBuffer) + capacity);
rv->full_cb = rv->nonfull_cb = NULL;
rv->data = NULL;
rv->size = 0;
rv->write_ptr = rv->read_ptr = rv->start_ptr;
rv->end_ptr = rv->start_ptr + capacity;
rv->temp = NULL;
return rv;
}
void rbuffer_free(RBuffer *buf) FUNC_ATTR_NONNULL_ALL
{
xfree(buf->temp);
xfree(buf);
}
/// Return a pointer to a raw buffer containing the first empty slot available
/// for writing. The second argument is a pointer to the maximum number of
/// bytes that could be written.
///
/// It is necessary to call this function twice to ensure all empty space was
/// used. See RBUFFER_UNTIL_FULL for a macro that simplifies this task.
char *rbuffer_write_ptr(RBuffer *buf, size_t *write_count) FUNC_ATTR_NONNULL_ALL
{
if (buf->size == rbuffer_capacity(buf)) {
*write_count = 0;
return NULL;
}
if (buf->write_ptr >= buf->read_ptr) {
*write_count = (size_t)(buf->end_ptr - buf->write_ptr);
} else {
*write_count = (size_t)(buf->read_ptr - buf->write_ptr);
}
return buf->write_ptr;
}
// Reset an RBuffer so read_ptr is at the beginning of the memory. If
// necessary, this moves existing data by allocating temporary memory.
void rbuffer_reset(RBuffer *buf) FUNC_ATTR_NONNULL_ALL
{
size_t temp_size;
if ((temp_size = rbuffer_size(buf))) {
if (buf->temp == NULL) {
buf->temp = xcalloc(1, rbuffer_capacity(buf));
}
rbuffer_read(buf, buf->temp, buf->size);
}
buf->read_ptr = buf->write_ptr = buf->start_ptr;
if (temp_size) {
rbuffer_write(buf, buf->temp, temp_size);
}
}
/// Adjust `rbuffer` write pointer to reflect produced data. This is called
/// automatically by `rbuffer_write`, but when using `rbuffer_write_ptr`
/// directly, this needs to called after the data was copied to the internal
/// buffer. The write pointer will be wrapped if required.
void rbuffer_produced(RBuffer *buf, size_t count) FUNC_ATTR_NONNULL_ALL
{
assert(count && count <= rbuffer_space(buf));
buf->write_ptr += count;
if (buf->write_ptr >= buf->end_ptr) {
// wrap around
buf->write_ptr -= rbuffer_capacity(buf);
}
buf->size += count;
if (buf->full_cb && !rbuffer_space(buf)) {
buf->full_cb(buf, buf->data);
}
}
/// Return a pointer to a raw buffer containing the first byte available
/// for reading. The second argument is a pointer to the maximum number of
/// bytes that could be read.
///
/// It is necessary to call this function twice to ensure all available bytes
/// were read. See RBUFFER_UNTIL_EMPTY for a macro that simplifies this task.
char *rbuffer_read_ptr(RBuffer *buf, size_t *read_count) FUNC_ATTR_NONNULL_ALL
{
if (!buf->size) {
*read_count = 0;
return buf->read_ptr;
}
if (buf->read_ptr < buf->write_ptr) {
*read_count = (size_t)(buf->write_ptr - buf->read_ptr);
} else {
*read_count = (size_t)(buf->end_ptr - buf->read_ptr);
}
return buf->read_ptr;
}
/// Adjust `rbuffer` read pointer to reflect consumed data. This is called
/// automatically by `rbuffer_read`, but when using `rbuffer_read_ptr`
/// directly, this needs to called after the data was copied from the internal
/// buffer. The read pointer will be wrapped if required.
void rbuffer_consumed(RBuffer *buf, size_t count)
FUNC_ATTR_NONNULL_ALL
{
if (count == 0) {
return;
}
assert(count <= buf->size);
buf->read_ptr += count;
if (buf->read_ptr >= buf->end_ptr) {
buf->read_ptr -= rbuffer_capacity(buf);
}
bool was_full = buf->size == rbuffer_capacity(buf);
buf->size -= count;
if (buf->nonfull_cb && was_full) {
buf->nonfull_cb(buf, buf->data);
}
}
/// Use instead of rbuffer_consumed to use rbuffer in a linear, non-cyclic fashion.
///
/// This is generally useful if we can guarantee to parse all input
/// except some small incomplete token, like when parsing msgpack.
void rbuffer_consumed_compact(RBuffer *buf, size_t count)
FUNC_ATTR_NONNULL_ALL
{
assert(buf->read_ptr <= buf->write_ptr);
rbuffer_consumed(buf, count);
if (buf->read_ptr > buf->start_ptr) {
assert((size_t)(buf->write_ptr - buf->read_ptr) == buf->size
|| buf->write_ptr == buf->start_ptr);
memmove(buf->start_ptr, buf->read_ptr, buf->size);
buf->read_ptr = buf->start_ptr;
buf->write_ptr = buf->read_ptr + buf->size;
}
}
// Higher level functions for copying from/to RBuffer instances and data
// pointers
size_t rbuffer_write(RBuffer *buf, const char *src, size_t src_size)
FUNC_ATTR_NONNULL_ALL
{
size_t size = src_size;
RBUFFER_UNTIL_FULL(buf, wptr, wcnt) {
size_t copy_count = MIN(src_size, wcnt);
memcpy(wptr, src, copy_count);
rbuffer_produced(buf, copy_count);
if (!(src_size -= copy_count)) {
return size;
}
src += copy_count;
}
return size - src_size;
}
size_t rbuffer_read(RBuffer *buf, char *dst, size_t dst_size)
FUNC_ATTR_NONNULL_ALL
{
size_t size = dst_size;
RBUFFER_UNTIL_EMPTY(buf, rptr, rcnt) {
size_t copy_count = MIN(dst_size, rcnt);
memcpy(dst, rptr, copy_count);
rbuffer_consumed(buf, copy_count);
if (!(dst_size -= copy_count)) {
return size;
}
dst += copy_count;
}
return size - dst_size;
}
char *rbuffer_get(RBuffer *buf, size_t index)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_NONNULL_RET
{
assert(index < buf->size);
char *rptr = buf->read_ptr + index;
if (rptr >= buf->end_ptr) {
rptr -= rbuffer_capacity(buf);
}
return rptr;
}
int rbuffer_cmp(RBuffer *buf, const char *str, size_t count)
FUNC_ATTR_NONNULL_ALL
{
assert(count <= buf->size);
size_t rcnt;
rbuffer_read_ptr(buf, &rcnt);
size_t n = MIN(count, rcnt);
int rv = memcmp(str, buf->read_ptr, n);
count -= n;
size_t remaining = buf->size - rcnt;
if (rv || !count || !remaining) {
return rv;
}
return memcmp(str + n, buf->start_ptr, count);
}

View File

@ -1,71 +0,0 @@
// Specialized ring buffer. This is basically an array that wraps read/write
// pointers around the memory region. It should be more efficient than the old
// RBuffer which required memmove() calls to relocate read/write positions.
//
// The main purpose of RBuffer is simplify memory management when reading from
// uv_stream_t instances:
//
// - The event loop writes data to a RBuffer, advancing the write pointer
// - The main loop reads data, advancing the read pointer
// - If the buffer becomes full(size == capacity) the rstream is temporarily
// stopped(automatic backpressure handling)
//
// Reference: http://en.wikipedia.org/wiki/Circular_buffer
#pragma once
#include <stddef.h>
#include <stdint.h>
#include "nvim/rbuffer_defs.h" // IWYU pragma: keep
// Macros that simplify working with the read/write pointers directly by hiding
// ring buffer wrap logic. Some examples:
//
// - Pass the write pointer to a function(write_data) that incrementally
// produces data, returning the number of bytes actually written to the
// ring buffer:
//
// RBUFFER_UNTIL_FULL(rbuf, ptr, cnt)
// rbuffer_produced(rbuf, write_data(state, ptr, cnt));
//
// - Pass the read pointer to a function(read_data) that incrementally
// consumes data, returning the number of bytes actually read from the
// ring buffer:
//
// RBUFFER_UNTIL_EMPTY(rbuf, ptr, cnt)
// rbuffer_consumed(rbuf, read_data(state, ptr, cnt));
//
// Note that the rbuffer_{produced,consumed} calls are necessary or these macros
// create infinite loops
#define RBUFFER_UNTIL_EMPTY(buf, rptr, rcnt) \
for (size_t rcnt = 0, _r = 1; _r; _r = 0) \
for (char *rptr = rbuffer_read_ptr(buf, &rcnt); \
buf->size; \
rptr = rbuffer_read_ptr(buf, &rcnt))
#define RBUFFER_UNTIL_FULL(buf, wptr, wcnt) \
for (size_t wcnt = 0, _r = 1; _r; _r = 0) \
for (char *wptr = rbuffer_write_ptr(buf, &wcnt); \
rbuffer_space(buf); \
wptr = rbuffer_write_ptr(buf, &wcnt))
// Iteration
#define RBUFFER_EACH(buf, c, i) \
for (size_t i = 0; \
i < buf->size; \
i = buf->size) \
for (char c = 0; \
i < buf->size ? ((int)(c = *rbuffer_get(buf, i))) || 1 : 0; \
i++)
#define RBUFFER_EACH_REVERSE(buf, c, i) \
for (size_t i = buf->size; \
i != SIZE_MAX; \
i = SIZE_MAX) \
for (char c = 0; \
i-- > 0 ? ((int)(c = *rbuffer_get(buf, i))) || 1 : 0; \
)
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "rbuffer.h.generated.h"
#endif

View File

@ -1,45 +0,0 @@
#pragma once
#include <stddef.h>
#include "nvim/func_attr.h"
typedef struct rbuffer RBuffer;
/// Type of function invoked during certain events:
/// - When the RBuffer switches to the full state
/// - When the RBuffer switches to the non-full state
typedef void (*rbuffer_callback)(RBuffer *buf, void *data);
struct rbuffer {
rbuffer_callback full_cb, nonfull_cb;
void *data;
size_t size;
// helper memory used to by rbuffer_reset if required
char *temp;
char *end_ptr, *read_ptr, *write_ptr;
char start_ptr[];
};
static inline size_t rbuffer_size(RBuffer *buf)
REAL_FATTR_ALWAYS_INLINE REAL_FATTR_NONNULL_ALL;
static inline size_t rbuffer_size(RBuffer *buf)
{
return buf->size;
}
static inline size_t rbuffer_capacity(RBuffer *buf)
REAL_FATTR_ALWAYS_INLINE REAL_FATTR_NONNULL_ALL;
static inline size_t rbuffer_capacity(RBuffer *buf)
{
return (size_t)(buf->end_ptr - buf->start_ptr);
}
static inline size_t rbuffer_space(RBuffer *buf)
REAL_FATTR_ALWAYS_INLINE REAL_FATTR_NONNULL_ALL;
static inline size_t rbuffer_space(RBuffer *buf)
{
return rbuffer_capacity(buf) - buf->size;
}

View File

@ -15,7 +15,6 @@
#include "nvim/option_vars.h"
#include "nvim/os/os.h"
#include "nvim/os/os_defs.h"
#include "nvim/rbuffer.h"
#include "nvim/strings.h"
#include "nvim/tui/input.h"
#include "nvim/tui/input_defs.h"
@ -153,7 +152,7 @@ void tinput_init(TermInput *input, Loop *loop)
termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS);
// setup input handle
rstream_init_fd(loop, &input->read_stream, input->in_fd, READ_STREAM_SIZE);
rstream_init_fd(loop, &input->read_stream, input->in_fd);
// initialize a timer handle for handling ESC with libtermkey
uv_timer_init(&loop->uv, &input->timer_handle);
@ -211,9 +210,9 @@ static void tinput_flush(TermInput *input)
input->key_buffer_len = 0;
}
static void tinput_enqueue(TermInput *input, char *buf, size_t size)
static void tinput_enqueue(TermInput *input, const char *buf, size_t size)
{
if (input->key_buffer_len > KEY_BUFFER_SIZE - 0xff) {
if (input->key_buffer_len > KEY_BUFFER_SIZE - size) {
// don't ever let the buffer get too full or we risk putting incomplete keys into it
tinput_flush(input);
}
@ -463,8 +462,10 @@ static void tinput_timer_cb(uv_timer_t *handle)
TermInput *input = handle->data;
// If the raw buffer is not empty, process the raw buffer first because it is
// processing an incomplete bracketed paster sequence.
if (rbuffer_size(input->read_stream.buffer)) {
handle_raw_buffer(input, true);
size_t size = rstream_available(&input->read_stream);
if (size) {
size_t consumed = handle_raw_buffer(input, true, input->read_stream.read_pos, size);
rstream_consume(&input->read_stream, consumed);
}
tk_getkeys(input, true);
tinput_flush(input);
@ -478,39 +479,37 @@ static void tinput_timer_cb(uv_timer_t *handle)
///
/// @param input the input stream
/// @return true iff handle_focus_event consumed some input
static bool handle_focus_event(TermInput *input)
static size_t handle_focus_event(TermInput *input, const char *ptr, size_t size)
{
if (rbuffer_size(input->read_stream.buffer) > 2
&& (!rbuffer_cmp(input->read_stream.buffer, "\x1b[I", 3)
|| !rbuffer_cmp(input->read_stream.buffer, "\x1b[O", 3))) {
bool focus_gained = *rbuffer_get(input->read_stream.buffer, 2) == 'I';
// Advance past the sequence
rbuffer_consumed(input->read_stream.buffer, 3);
if (size >= 3
&& (!memcmp(ptr, "\x1b[I", 3)
|| !memcmp(ptr, "\x1b[O", 3))) {
bool focus_gained = ptr[2] == 'I';
MAXSIZE_TEMP_ARRAY(args, 1);
ADD_C(args, BOOLEAN_OBJ(focus_gained));
rpc_send_event(ui_client_channel_id, "nvim_ui_set_focus", args);
return true;
return 3; // Advance past the sequence
}
return false;
return 0;
}
#define START_PASTE "\x1b[200~"
#define END_PASTE "\x1b[201~"
static HandleState handle_bracketed_paste(TermInput *input)
static size_t handle_bracketed_paste(TermInput *input, const char *ptr, size_t size,
bool *incomplete)
{
size_t buf_size = rbuffer_size(input->read_stream.buffer);
if (buf_size > 5
&& (!rbuffer_cmp(input->read_stream.buffer, START_PASTE, 6)
|| !rbuffer_cmp(input->read_stream.buffer, END_PASTE, 6))) {
bool enable = *rbuffer_get(input->read_stream.buffer, 4) == '0';
if (size >= 6
&& (!memcmp(ptr, START_PASTE, 6)
|| !memcmp(ptr, END_PASTE, 6))) {
bool enable = ptr[4] == '0';
if (input->paste && enable) {
return kNotApplicable; // Pasting "start paste" code literally.
return 0; // Pasting "start paste" code literally.
}
// Advance past the sequence
rbuffer_consumed(input->read_stream.buffer, 6);
if (!!input->paste == enable) {
return kComplete; // Spurious "disable paste" code.
return 6; // Spurious "disable paste" code.
}
if (enable) {
@ -525,15 +524,15 @@ static HandleState handle_bracketed_paste(TermInput *input)
// Paste phase: "disabled".
input->paste = 0;
}
return kComplete;
} else if (buf_size < 6
&& (!rbuffer_cmp(input->read_stream.buffer, START_PASTE, buf_size)
|| !rbuffer_cmp(input->read_stream.buffer,
END_PASTE, buf_size))) {
return 6;
} else if (size < 6
&& (!memcmp(ptr, START_PASTE, size)
|| !memcmp(ptr, END_PASTE, size))) {
// Wait for further input, as the sequence may be split.
return kIncomplete;
*incomplete = true;
return 0;
}
return kNotApplicable;
return 0;
}
/// Handle an OSC or DCS response sequence from the terminal.
@ -644,20 +643,31 @@ static void handle_unknown_csi(TermInput *input, const TermKeyKey *key)
}
}
static void handle_raw_buffer(TermInput *input, bool force)
static size_t handle_raw_buffer(TermInput *input, bool force, const char *data, size_t size)
{
HandleState is_paste = kNotApplicable;
const char *ptr = data;
do {
if (!force
&& (handle_focus_event(input)
|| (is_paste = handle_bracketed_paste(input)) != kNotApplicable)) {
if (is_paste == kIncomplete) {
if (!force) {
size_t consumed = handle_focus_event(input, ptr, size);
if (consumed) {
ptr += consumed;
size -= consumed;
continue;
}
bool incomplete = false;
consumed = handle_bracketed_paste(input, ptr, size, &incomplete);
if (incomplete) {
assert(consumed == 0);
// Wait for the next input, leaving it in the raw buffer due to an
// incomplete sequence.
return;
return (size_t)(ptr - data);
} else if (consumed) {
ptr += consumed;
size -= consumed;
continue;
}
continue;
}
//
@ -666,55 +676,47 @@ static void handle_raw_buffer(TermInput *input, bool force)
// calls (above) depend on this.
//
size_t count = 0;
RBUFFER_EACH(input->read_stream.buffer, c, i) {
for (size_t i = 0; i < size; i++) {
count = i + 1;
if (c == '\x1b' && count > 1) {
if (ptr[i] == '\x1b' && count > 1) {
count--;
break;
}
}
// Push bytes directly (paste).
if (input->paste) {
RBUFFER_UNTIL_EMPTY(input->read_stream.buffer, ptr, len) {
size_t consumed = MIN(count, len);
assert(consumed <= input->read_stream.buffer->size);
tinput_enqueue(input, ptr, consumed);
rbuffer_consumed(input->read_stream.buffer, consumed);
if (!(count -= consumed)) {
break;
}
}
tinput_enqueue(input, ptr, count);
ptr += count;
size -= count;
continue;
}
// Push through libtermkey (translates to "<keycode>" strings, etc.).
RBUFFER_UNTIL_EMPTY(input->read_stream.buffer, ptr, len) {
const size_t size = MIN(count, len);
if (size > termkey_get_buffer_remaining(input->tk)) {
{
const size_t to_use = MIN(count, size);
if (to_use > termkey_get_buffer_remaining(input->tk)) {
// We are processing a very long escape sequence. Increase termkey's
// internal buffer size. We don't handle out of memory situations so
// abort if it fails
const size_t delta = size - termkey_get_buffer_remaining(input->tk);
const size_t delta = to_use - termkey_get_buffer_remaining(input->tk);
const size_t bufsize = termkey_get_buffer_size(input->tk);
if (!termkey_set_buffer_size(input->tk, MAX(bufsize + delta, bufsize * 2))) {
abort();
}
}
size_t consumed = termkey_push_bytes(input->tk, ptr, size);
size_t consumed = termkey_push_bytes(input->tk, ptr, to_use);
// We resize termkey's buffer when it runs out of space, so this should
// never happen
assert(consumed <= rbuffer_size(input->read_stream.buffer));
rbuffer_consumed(input->read_stream.buffer, consumed);
assert(consumed <= to_use);
ptr += consumed;
size -= consumed;
// Process the input buffer now for any keys
tk_getkeys(input, false);
if (!(count -= consumed)) {
break;
}
}
} while (rbuffer_size(input->read_stream.buffer));
} while (size);
const size_t tk_size = termkey_get_buffer_size(input->tk);
const size_t tk_remaining = termkey_get_buffer_remaining(input->tk);
@ -726,23 +728,25 @@ static void handle_raw_buffer(TermInput *input, bool force)
abort();
}
}
return (size_t)(ptr - data);
}
static void tinput_read_cb(RStream *stream, RBuffer *buf, size_t count_, void *data, bool eof)
static size_t tinput_read_cb(RStream *stream, const char *buf, size_t count_, void *data, bool eof)
{
TermInput *input = data;
size_t consumed = handle_raw_buffer(input, false, buf, count_);
tinput_flush(input);
if (eof) {
loop_schedule_fast(&main_loop, event_create(tinput_done_event, NULL));
return;
return consumed;
}
handle_raw_buffer(input, false);
tinput_flush(input);
// An incomplete sequence was found. Leave it in the raw buffer and wait for
// the next input.
if (rbuffer_size(input->read_stream.buffer)) {
if (consumed < count_) {
// If 'ttimeout' is not set, start the timer with a timeout of 0 to process
// the next input.
int64_t ms = input->ttimeout
@ -750,11 +754,7 @@ static void tinput_read_cb(RStream *stream, RBuffer *buf, size_t count_, void *d
// Stop the current timer if already running
uv_timer_stop(&input->timer_handle);
uv_timer_start(&input->timer_handle, tinput_timer_cb, (uint32_t)ms, 0);
return;
}
// Make sure the next input escape sequence fits into the ring buffer without
// wraparound, else it could be misinterpreted (because rbuffer_read_ptr()
// exposes the underlying buffer to callers unaware of the wraparound).
rbuffer_reset(input->read_stream.buffer);
return consumed;
}

View File

@ -5,7 +5,6 @@
#include <uv.h>
#include "nvim/event/defs.h"
#include "nvim/rbuffer_defs.h"
#include "nvim/tui/input_defs.h" // IWYU pragma: keep
#include "nvim/tui/tui_defs.h"
#include "nvim/types_defs.h"
@ -17,7 +16,7 @@ typedef enum {
kKeyEncodingXterm, ///< Xterm's modifyOtherKeys encoding (XTMODKEYS)
} KeyEncoding;
#define KEY_BUFFER_SIZE 0xfff
#define KEY_BUFFER_SIZE 0x1000
typedef struct {
int in_fd;
// Phases: -1=all 0=disabled 1=first-chunk 2=continue 3=last-chunk
@ -40,12 +39,6 @@ typedef struct {
size_t key_buffer_len;
} TermInput;
typedef enum {
kIncomplete = -1,
kNotApplicable = 0,
kComplete = 1,
} HandleState;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "tui/input.h.generated.h"
#endif

View File

@ -1,28 +0,0 @@
#include "nvim/rbuffer.h"
#include "rbuffer.h"
void ut_rbuffer_each_read_chunk(RBuffer *buf, each_ptr_cb cb)
{
RBUFFER_UNTIL_EMPTY(buf, rptr, rcnt) {
cb(rptr, rcnt);
rbuffer_consumed(buf, rcnt);
}
}
void ut_rbuffer_each_write_chunk(RBuffer *buf, each_ptr_cb cb)
{
RBUFFER_UNTIL_FULL(buf, wptr, wcnt) {
cb(wptr, wcnt);
rbuffer_produced(buf, wcnt);
}
}
void ut_rbuffer_each(RBuffer *buf, each_cb cb)
{
RBUFFER_EACH(buf, c, i) cb(c, i);
}
void ut_rbuffer_each_reverse(RBuffer *buf, each_cb cb)
{
RBUFFER_EACH_REVERSE(buf, c, i) cb(c, i);
}

View File

@ -1,9 +0,0 @@
#include "nvim/rbuffer.h"
typedef void(*each_ptr_cb)(char *ptr, size_t cnt);
typedef void(*each_cb)(char c, size_t i);
void ut_rbuffer_each_read_chunk(RBuffer *buf, each_ptr_cb cb);
void ut_rbuffer_each_write_chunk(RBuffer *buf, each_ptr_cb cb);
void ut_rbuffer_each(RBuffer *buf, each_cb cb);
void ut_rbuffer_each_reverse(RBuffer *buf, each_cb cb);

View File

@ -1,340 +0,0 @@
local t = require('test.unit.testutil')
local itp = t.gen_itp(it)
local eq = t.eq
local ffi = t.ffi
local cstr = t.cstr
local to_cstr = t.to_cstr
local child_call_once = t.child_call_once
local rbuffer = t.cimport('./test/unit/fixtures/rbuffer.h')
describe('rbuffer functions', function()
local capacity = 16
local rbuf
local function inspect()
return ffi.string(rbuf.start_ptr, capacity)
end
local function write(str)
local buf = to_cstr(str)
return rbuffer.rbuffer_write(rbuf, buf, #str)
end
local function read(len)
local buf = cstr(len)
len = rbuffer.rbuffer_read(rbuf, buf, len)
return ffi.string(buf, len)
end
local function get(idx)
return ffi.string(rbuffer.rbuffer_get(rbuf, idx), 1)
end
before_each(function()
child_call_once(function()
rbuf = ffi.gc(rbuffer.rbuffer_new(capacity), rbuffer.rbuffer_free)
-- fill the internal buffer with the character '0' to simplify inspecting
ffi.C.memset(rbuf.start_ptr, string.byte('0'), capacity)
end)
end)
describe('RBUFFER_UNTIL_FULL', function()
local chunks
local function collect_write_chunks()
rbuffer.ut_rbuffer_each_write_chunk(rbuf, function(wptr, wcnt)
table.insert(chunks, ffi.string(wptr, wcnt))
end)
end
before_each(function()
chunks = {}
end)
describe('with empty buffer in one contiguous chunk', function()
itp('is called once with the empty chunk', function()
collect_write_chunks()
eq({ '0000000000000000' }, chunks)
end)
end)
describe('with partially empty buffer in one contiguous chunk', function()
itp('is called once with the empty chunk', function()
write('string')
collect_write_chunks()
eq({ '0000000000' }, chunks)
end)
end)
describe('with filled buffer in one contiguous chunk', function()
itp('is not called', function()
write('abcdefghijklmnopq')
collect_write_chunks()
eq({}, chunks)
end)
end)
describe('with buffer partially empty in two contiguous chunks', function()
itp('is called twice with each filled chunk', function()
write('1234567890')
read(8)
collect_write_chunks()
eq({ '000000', '12345678' }, chunks)
end)
end)
describe('with buffer empty in two contiguous chunks', function()
itp('is called twice with each filled chunk', function()
write('12345678')
read(8)
collect_write_chunks()
eq({ '00000000', '12345678' }, chunks)
end)
end)
describe('with buffer filled in two contiguous chunks', function()
itp('is not called', function()
write('12345678')
read(8)
write('abcdefghijklmnopq')
collect_write_chunks()
eq({}, chunks)
end)
end)
end)
describe('RBUFFER_UNTIL_EMPTY', function()
local chunks
local function collect_read_chunks()
rbuffer.ut_rbuffer_each_read_chunk(rbuf, function(rptr, rcnt)
table.insert(chunks, ffi.string(rptr, rcnt))
end)
end
before_each(function()
chunks = {}
end)
describe('with empty buffer', function()
itp('is not called', function()
collect_read_chunks()
eq({}, chunks)
end)
end)
describe('with partially filled buffer in one contiguous chunk', function()
itp('is called once with the filled chunk', function()
write('string')
collect_read_chunks()
eq({ 'string' }, chunks)
end)
end)
describe('with filled buffer in one contiguous chunk', function()
itp('is called once with the filled chunk', function()
write('abcdefghijklmnopq')
collect_read_chunks()
eq({ 'abcdefghijklmnop' }, chunks)
end)
end)
describe('with buffer partially filled in two contiguous chunks', function()
itp('is called twice with each filled chunk', function()
write('1234567890')
read(10)
write('long string')
collect_read_chunks()
eq({ 'long s', 'tring' }, chunks)
end)
end)
describe('with buffer filled in two contiguous chunks', function()
itp('is called twice with each filled chunk', function()
write('12345678')
read(8)
write('abcdefghijklmnopq')
collect_read_chunks()
eq({ 'abcdefgh', 'ijklmnop' }, chunks)
end)
end)
end)
describe('RBUFFER_EACH', function()
local chars
local function collect_chars()
rbuffer.ut_rbuffer_each(rbuf, function(c, i)
table.insert(chars, { string.char(c), tonumber(i) })
end)
end
before_each(function()
chars = {}
end)
describe('with empty buffer', function()
itp('is not called', function()
collect_chars()
eq({}, chars)
end)
end)
describe('with buffer filled in two contiguous chunks', function()
itp('collects each character and index', function()
write('1234567890')
read(10)
write('long string')
collect_chars()
eq({
{ 'l', 0 },
{ 'o', 1 },
{ 'n', 2 },
{ 'g', 3 },
{ ' ', 4 },
{ 's', 5 },
{ 't', 6 },
{ 'r', 7 },
{ 'i', 8 },
{ 'n', 9 },
{ 'g', 10 },
}, chars)
end)
end)
end)
describe('RBUFFER_EACH_REVERSE', function()
local chars
local function collect_chars()
rbuffer.ut_rbuffer_each_reverse(rbuf, function(c, i)
table.insert(chars, { string.char(c), tonumber(i) })
end)
end
before_each(function()
chars = {}
end)
describe('with empty buffer', function()
itp('is not called', function()
collect_chars()
eq({}, chars)
end)
end)
describe('with buffer filled in two contiguous chunks', function()
itp('collects each character and index', function()
write('1234567890')
read(10)
write('long string')
collect_chars()
eq({
{ 'g', 10 },
{ 'n', 9 },
{ 'i', 8 },
{ 'r', 7 },
{ 't', 6 },
{ 's', 5 },
{ ' ', 4 },
{ 'g', 3 },
{ 'n', 2 },
{ 'o', 1 },
{ 'l', 0 },
}, chars)
end)
end)
end)
describe('rbuffer_cmp', function()
local function cmp(str)
local rv = rbuffer.rbuffer_cmp(rbuf, to_cstr(str), #str)
if rv == 0 then
return 0
else
return rv / math.abs(rv)
end
end
describe('with buffer filled in two contiguous chunks', function()
itp('compares the common longest sequence', function()
write('1234567890')
read(10)
write('long string')
eq(0, cmp('long string'))
eq(0, cmp('long strin'))
eq(-1, cmp('long striM'))
eq(1, cmp('long strio'))
eq(0, cmp('long'))
eq(-1, cmp('lonG'))
eq(1, cmp('lonh'))
end)
end)
describe('with empty buffer', function()
itp('returns 0 since no characters are compared', function()
eq(0, cmp(''))
end)
end)
end)
describe('rbuffer_write', function()
itp('fills the internal buffer and returns the write count', function()
eq(12, write('short string'))
eq('short string0000', inspect())
end)
itp('wont write beyond capacity', function()
eq(16, write('very very long string'))
eq('very very long s', inspect())
end)
end)
describe('rbuffer_read', function()
itp('reads what was previously written', function()
write('to read')
eq('to read', read(20))
end)
itp('reads nothing if the buffer is empty', function()
eq('', read(20))
write('empty')
eq('empty', read(20))
eq('', read(20))
end)
end)
describe('rbuffer_get', function()
itp('fetch the pointer at offset, wrapping if required', function()
write('1234567890')
read(10)
write('long string')
eq('l', get(0))
eq('o', get(1))
eq('n', get(2))
eq('g', get(3))
eq(' ', get(4))
eq('s', get(5))
eq('t', get(6))
eq('r', get(7))
eq('i', get(8))
eq('n', get(9))
eq('g', get(10))
end)
end)
describe('wrapping behavior', function()
itp('writing/reading wraps across the end of the internal buffer', function()
write('1234567890')
eq('1234', read(4))
eq('5678', read(4))
write('987654321')
eq('3214567890987654', inspect())
eq('90987654321', read(20))
eq('', read(4))
write('abcdefghijklmnopqrs')
eq('nopabcdefghijklm', inspect())
eq('abcdefghijklmnop', read(20))
end)
end)
end)