WvStreams
wvistreamlist.cc
1/*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * WvIStreamList holds a list of IWvStream objects -- and its select() and
6 * callback() functions know how to handle multiple simultaneous streams.
7 */
8#include "wvistreamlist.h"
9#include "wvstringlist.h"
10#include "wvstreamsdebugger.h"
11#include "wvstrutils.h"
12
13#include "wvassert.h"
14#include "wvstrutils.h"
15
16#ifndef _WIN32
17#include "wvfork.h"
18#endif
19
20#ifdef HAVE_VALGRIND_MEMCHECK_H
21#include <valgrind/memcheck.h>
22#else
23#define RUNNING_ON_VALGRIND false
24#endif
25
26// enable this to add some read/write trace messages (this can be VERY
27// verbose)
28#define STREAMTRACE 0
29#if STREAMTRACE
30# define TRACE(x, y...) fprintf(stderr, x, ## y)
31#else
32#ifndef _MSC_VER
33# define TRACE(x, y...)
34#else
35# define TRACE
36#endif
37#endif
38
39WvIStreamList WvIStreamList::globallist;
40
41
42WvIStreamList::WvIStreamList():
43 in_select(false), dead_stream(false)
44{
45 readcb = writecb = exceptcb = 0;
46 auto_prune = true;
47 if (this == &globallist)
48 {
49 globalstream = this;
50#ifndef _WIN32
51 add_wvfork_callback(WvIStreamList::onfork);
52#endif
53 set_wsname("globallist");
54 add_debugger_commands();
55 }
56}
57
58
59WvIStreamList::~WvIStreamList()
60{
61 close();
62}
63
64
66{
67 return WvStream::isok();
68}
69
70
72{
73public:
74 BoolGuard(bool &_guard_bool):
75 guard_bool(_guard_bool)
76 {
77 assert(!guard_bool);
78 guard_bool = true;
79 }
81 {
82 guard_bool = false;
83 }
84private:
85 bool &guard_bool;
86};
87
88
90{
91 //BoolGuard guard(in_select);
92 bool already_sure = false;
93 SelectRequest oldwant = si.wants;
94
95 sure_thing.zap();
96
97 time_t alarmleft = alarm_remaining();
98 if (alarmleft == 0)
99 already_sure = true;
100
101 IWvStream *old_in_stream = WvCrashInfo::in_stream;
102 const char *old_in_stream_id = WvCrashInfo::in_stream_id;
103 WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
104 WvCrashInfo::in_stream_state = WvCrashInfo::PRE_SELECT;
105
106 Iter i(*this);
107 for (i.rewind(); i.next(); )
108 {
109 IWvStream &s(*i);
110#if I_ENJOY_FORMATTING_STRINGS
111 WvCrashWill will("doing pre_select for \"%s\" (%s)\n%s",
112 i.link->id, ptr2str(&s), wvcrash_read_will());
113#else
114 WvCrashInfo::in_stream = &s;
115 WvCrashInfo::in_stream_id = i.link->id;
116#endif
117 si.wants = oldwant;
118 s.pre_select(si);
119
120 if (!s.isok())
121 already_sure = true;
122
123 TRACE("after pre_select(%s): msec_timeout is %ld\n",
124 i.link->id, (long)si.msec_timeout);
125 }
126
127 WvCrashInfo::in_stream = old_in_stream;
128 WvCrashInfo::in_stream_id = old_in_stream_id;
129 WvCrashInfo::in_stream_state = old_in_stream_state;
130
131 if (alarmleft >= 0 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
132 si.msec_timeout = alarmleft;
133
134 si.wants = oldwant;
135
136 if (already_sure)
137 si.msec_timeout = 0;
138}
139
140
142{
143 //BoolGuard guard(in_select);
144 bool already_sure = false;
145 SelectRequest oldwant = si.wants;
146
147 time_t alarmleft = alarm_remaining();
148 if (alarmleft == 0)
149 already_sure = true;
150
151 IWvStream *old_in_stream = WvCrashInfo::in_stream;
152 const char *old_in_stream_id = WvCrashInfo::in_stream_id;
153 WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
154 WvCrashInfo::in_stream_state = WvCrashInfo::POST_SELECT;
155
156 Iter i(*this);
157 for (i.rewind(); i.cur() && i.next(); )
158 {
159 IWvStream &s(*i);
160#if I_ENJOY_FORMATTING_STRINGS
161 WvCrashWill will("doing post_select for \"%s\" (%s)\n%s",
162 i.link->id, ptr2str(&s), wvcrash_read_will());
163#else
164 WvCrashInfo::in_stream = &s;
165 WvCrashInfo::in_stream_id = i.link->id;
166#endif
167
168 si.wants = oldwant;
169 if (s.post_select(si))
170 {
171 TRACE("post_select(%s) was true\n", i.link->id);
172 sure_thing.unlink(&s); // don't add it twice!
173 s.addRef();
174 sure_thing.append(&s, true, i.link->id);
175 }
176 else
177 {
178 TRACE("post_select(%s) was false\n", i.link->id);
179 WvIStreamListBase::Iter j(sure_thing);
180 WvLink* link = j.find(&s);
181
182 wvassert(!link, "stream \"%s\" (%s) was ready in "
183 "pre_select, but not in post_select",
184 link->id, ptr2str(link->data));
185 }
186
187 if (!s.isok())
188 {
189 already_sure = true;
190 if (auto_prune)
191 i.xunlink();
192 }
193 }
194
195 WvCrashInfo::in_stream = old_in_stream;
196 WvCrashInfo::in_stream_id = old_in_stream_id;
197 WvCrashInfo::in_stream_state = old_in_stream_state;
198
199 si.wants = oldwant;
200 return already_sure || !sure_thing.isempty();
201}
202
203
204// distribute the callback() request to all children that select 'true'
206{
207 static int level = 0;
208 const char *id;
209 level++;
210
212
213 TRACE("\n%*sList@%p: (%d sure) ", level, "", this, sure_thing.count());
214
215 IWvStream *old_in_stream = WvCrashInfo::in_stream;
216 const char *old_in_stream_id = WvCrashInfo::in_stream_id;
217 WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
218 WvCrashInfo::in_stream_state = WvCrashInfo::EXECUTE;
219
220 WvIStreamListBase::Iter i(sure_thing);
221 for (i.rewind(); i.next(); )
222 {
223#if STREAMTRACE
224 WvIStreamListBase::Iter x(*this);
225 if (!x.find(&i()))
226 TRACE("Yikes! %p in sure_thing, but not in main list!\n",
227 i.cur());
228#endif
229 IWvStream &s(*i);
230 s.addRef();
231
232 id = i.link->id;
233
234 TRACE("[%p:%s]", &s, id);
235
236 i.xunlink();
237
238#if DEBUG
239 if (!RUNNING_ON_VALGRIND)
240 {
241 WvString strace_node("%s: %s", s.wstype(), s.wsname());
242 ::write(-1, strace_node, strace_node.len());
243 }
244#endif
245#if I_ENJOY_FORMATTING_STRINGS
246 WvCrashWill my_will("executing stream: %s\n%s",
247 id ? id : "unknown stream",
248 wvcrash_read_will());
249#else
250 WvCrashInfo::in_stream = &s;
251 WvCrashInfo::in_stream_id = id;
252#endif
253
254 s.callback();
255 s.release();
256
257 // list might have changed!
258 i.rewind();
259 }
260
261 WvCrashInfo::in_stream = old_in_stream;
262 WvCrashInfo::in_stream_id = old_in_stream_id;
263 WvCrashInfo::in_stream_state = old_in_stream_state;
264
265 sure_thing.zap();
266
267 level--;
268 TRACE("[DONE %p]\n", this);
269}
270
271#ifndef _WIN32
272void WvIStreamList::onfork(pid_t p)
273{
274 if (p == 0)
275 {
276 // this is a child process: don't inherit the global streamlist
277 globallist.zap(false);
278 }
279}
280#endif
281
282
283void WvIStreamList::add_debugger_commands()
284{
285 WvStreamsDebugger::add_command("globallist", 0, debugger_globallist_run_cb, 0);
286}
287
288
289WvString WvIStreamList::debugger_globallist_run_cb(WvStringParm cmd,
290 WvStringList &args,
291 WvStreamsDebugger::ResultCallback result_cb, void *)
292{
293 debugger_streams_display_header(cmd, result_cb);
294 WvIStreamList::Iter i(globallist);
295 for (i.rewind(); i.next(); )
296 debugger_streams_maybe_display_one_stream(static_cast<WvStream *>(i.ptr()),
297 cmd, args, result_cb);
298
299 return WvString::null;
300}
301
virtual unsigned int addRef()=0
Indicate you are using this object.
virtual unsigned int release()=0
Indicate that you are finished using this object.
virtual bool isok() const =0
By default, returns true if geterr() == 0.
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition wvstring.h:94
WvStreamList holds a list of WvStream objects – and its select() and callback() functions know how to...
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
virtual bool isok() const
return true if the stream is actually usable right now
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
Definition wvstream.h:25
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition wvstream.h:652
virtual bool isok() const
return true if the stream is actually usable right now
Definition wvstream.cc:445
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition wvstream.cc:532
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
Definition wvstream.cc:341
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition wvstream.cc:1057
This is a WvList of WvStrings, and is a really handy way to parse strings.
WvString is an implementation of a simple and efficient printable-string class.
Definition wvstring.h:330
the data structure used by pre_select()/post_select() and internally by select().
Definition iwvstream.h:50
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
Definition iwvstream.h:34
Provides support for forking processes.
void add_wvfork_callback(WvForkCallback cb)
Register a callback to be called during wvfork.
Definition wvfork.cc:51
Various little string functions.
WvString ptr2str(void *ptr)
Converts a pointer into a string, like glibc's p formatter would do.
Definition strutils.cc:1318