1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::collections::VecDeque;
use std::io::{self, BufRead};
use std::str::Utf8Error;

use crate::{Match, Search};

/// A stream search for the provided BufReader
///
/// Will return [`SearchStreamError`]s for invalid UTF-8 or IO errors
pub struct StreamSearch<'a, R: io::BufRead> {
    search: &'a mut Search,
    reader: R,
    res_buf: VecDeque<Match>,
    closed: bool,
}

impl<'a, R: BufRead> StreamSearch<'a, R> {
    pub(super) fn new(search: &'a mut Search, reader: R) -> Self {
        Self {
            search,
            reader,
            res_buf: VecDeque::new(),
            closed: false,
        }
    }
}

impl<'a, R: io::BufRead> Iterator for StreamSearch<'a, R> {
    type Item = Result<Match, SearchStreamError>;

    fn next(&mut self) -> Option<Result<Match, SearchStreamError>> {
        if let Some(res) = self.res_buf.pop_front() {
            return Some(Ok(res));
        } else if self.closed {
            return None;
        }

        loop {
            let buf = self.reader.fill_buf();
            match buf {
                Ok(buf) => {
                    if buf.is_empty() {
                        self.closed = true;
                        self.res_buf = VecDeque::from(self.search.finish());
                        return self.res_buf.pop_front().map(Ok);
                    } else {
                        match self.search.next_bytes(buf) {
                            Ok(res) => {
                                self.res_buf = VecDeque::from(res);
                                if let Some(res) = self.res_buf.pop_front() {
                                    let len = buf.len();
                                    self.reader.consume(len);

                                    return Some(Ok(res));
                                }
                            }
                            Err(e) => return Some(Err(e)),
                        }
                        let len = buf.len();
                        self.reader.consume(len);
                    }
                }
                Err(e) => return Some(Err(e.into())),
            }

            if let Some(res) = self.res_buf.pop_front() {
                return Some(Ok(res));
            } else if self.closed {
                return None;
            }
        }
    }
}

/// An error raised while searching a stream
#[derive(Debug)]
pub enum SearchStreamError {
    IOError(io::Error),
    Utf8Error,
}

impl From<io::Error> for SearchStreamError {
    fn from(e: io::Error) -> Self {
        Self::IOError(e)
    }
}

impl From<Utf8Error> for SearchStreamError {
    fn from(_: Utf8Error) -> Self {
        Self::Utf8Error
    }
}