]> git.stg.codes - stg.git/blob - libs/common/blockio.cpp
Add subscriptions (to replace notifiers).
[stg.git] / libs / common / blockio.cpp
1 #include "stg/blockio.h"
2
3 namespace
4 {
5
6 void* adjust(void* base, size_t shift)
7 {
8     char* ptr = static_cast<char*>(base);
9     return ptr + shift;
10 }
11
12 } // namspace anonymous
13
14 using STG::BlockReader;
15 using STG::BlockWriter;
16
17 BlockReader::BlockReader(const IOVec& ioVec)
18     : m_dest(ioVec),
19       m_remainder(0)
20 {
21     for (size_t i = 0; i < m_dest.size(); ++i)
22         m_remainder += m_dest[i].iov_len;
23 }
24
25 bool BlockReader::read(int socket)
26 {
27     if (m_remainder == 0)
28         return true;
29
30     size_t offset = m_dest.size() - 1;
31     size_t toRead = m_remainder;
32     while (offset > 0) {
33         if (toRead < m_dest[offset].iov_len)
34             break;
35         toRead -= m_dest[offset].iov_len;
36         --offset;
37     }
38
39     IOVec dest(m_dest.size() - offset);
40     for (size_t i = 0; i < dest.size(); ++i) {
41         if (i == 0) {
42             dest[0].iov_len = toRead;
43             dest[0].iov_base = adjust(m_dest[offset].iov_base, m_dest[offset].iov_len - toRead);
44         } else {
45             dest[i] = m_dest[offset + i];
46         }
47     }
48
49     ssize_t res = readv(socket, dest.data(), dest.size());
50     if (res < 0)
51         return false;
52     if (res == 0)
53         return m_remainder == 0;
54     if (res < static_cast<ssize_t>(m_remainder))
55         m_remainder -= res;
56     else
57         m_remainder = 0;
58     return true;
59 }
60
61 BlockWriter::BlockWriter(const IOVec& ioVec)
62     : m_source(ioVec),
63       m_remainder(0)
64 {
65     for (size_t i = 0; i < m_source.size(); ++i)
66         m_remainder += m_source[i].iov_len;
67 }
68
69 bool BlockWriter::write(int socket)
70 {
71     if (m_remainder == 0)
72         return true;
73
74     size_t offset = m_source.size() - 1;
75     size_t toWrite = m_remainder;
76     while (offset > 0) {
77         if (toWrite < m_source[offset].iov_len)
78             break;
79         toWrite -= m_source[offset].iov_len;
80         --offset;
81     }
82
83     IOVec source(m_source.size() - offset);
84     for (size_t i = 0; i < source.size(); ++i) {
85         if (i == 0) {
86             source[0].iov_len = toWrite;
87             source[0].iov_base = adjust(m_source[offset].iov_base, m_source[offset].iov_len - toWrite);
88         } else {
89             source[i] = m_source[offset + i];
90         }
91     }
92     ssize_t res = writev(socket, source.data(), source.size());
93     if (res < 0)
94         return false;
95     if (res == 0)
96         return m_remainder == 0;
97     if (res < static_cast<ssize_t>(m_remainder))
98         m_remainder -= res;
99     else
100         m_remainder = 0;
101     return true;
102 }