    <rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:sy="http://purl.org/rss/1.0/modules/syndication/" xmlns:admin="http://webns.net/mvcb/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:content="http://purl.org/rss/1.0/modules/content/">
     <channel>
        <title>ACCU  :: Queue with Position Reservation</title>
        <link>https://members.accu.org/index.php/articles/1985</link>
        <description>Professionalism in Programming</description>
        <dc:language>en-us</dc:language> 
        <dc:creator>Administrator</dc:creator> 
        <admin:generatorAgent rdf:resource="http://www.xaraya.org" /> 
        <admin:errorReportsTo rdf:resource="mailto:webeditor@accu.org" />
       <sy:updatePeriod>hourly</sy:updatePeriod>
       <sy:updateFrequency>1</sy:updateFrequency>
       <docs>http://backend.userland.com/rss</docs>




<div class="xar-mod-head"><span class="xar-mod-title">Programming Topics + Overload Journal #101 - February 2011</span></div>

<table border="0" cellpadding="1" cellspacing="0">
    <tbody>
    <tr>
        <td valign="top">
            Browse in :
       </td>
       <td valign="top">

                                            <a href="https://members.accu.org/index.php/articles/">All</a>

                     &gt;                         <a href="https://members.accu.org/index.php/articles/c13/">Topics</a>

                     &gt;                         <a href="https://members.accu.org/index.php/articles/c65/">Programming</a>
<br />

                                            <a href="https://members.accu.org/index.php/articles/">All</a>

                     &gt;                         <a href="https://members.accu.org/index.php/articles/c76/">Journals</a>

                     &gt;                         <a href="https://members.accu.org/index.php/articles/c78/">Overload</a>

                     &gt;                         <a href="https://members.accu.org/index.php/articles/c293/">o101</a>
<br />

                                            <a href="https://members.accu.org/index.php/articles/c65-293/">Any of these categories</a>

                    -                        <a href="https://members.accu.org/index.php/articles/c65+293/">All of these categories</a>
<br />
</td>
   </tr>
   </tbody>
</table>




<div class="xar-error">
   <p>
 <strong>Note:</strong> when you create a new publication type,
the articles module will automatically use the templates
<em>user-display-[publicationtype].xt</em>
and <em>user-summary-[publicationtype].xt</em>.
If those templates do not exist when you try to preview or display a new article,
you'll get this warning :-)  Please place your own templates in themes/<em>yourtheme</em>/modules/articles . The templates will get the extension .xt there. </p>
</div>
<div class="xar-norm xar-standard-box-padding">
   <h1><strong>Title:</strong>&nbsp;Queue with Position Reservation</h1>
<p><strong>Author:</strong>&nbsp;Martin Moene</p>
<p>
<strong>Date:</strong> 05 February 2011 20:17:47 +00:00 or Sat, 05 February 2011 20:17:47 +00:00</p>
<p><strong>Summary:</strong>&nbsp;Multiple threads can make processing a message queue faster. Eugene Surman needs the right data structure.</p>
<p><strong>Body:</strong>&nbsp;<p>For the past five years I have mostly been developing multi-threaded messaging applications. While they were all quite different, there was one particular situation that kept recurring: sometimes it was required to maintain the sequential order of incoming and outgoing messages, even though they were being handled by multiple threads concurrently, and not necessarily in the same exact order they were received. I searched for a solution in many ready-made messaging libraries, but did not find anything satisfactory. So, I had to resort to developing a solution of my own: the <code>PRQueue</code> â€“ a Queue with Position Reservation (or â€˜seat reservationâ€™).</p>

<p><code>PRQueue</code> is implemented in C++ using two STL <code>deques</code> and the pthread library. Two simple classes â€“ <code>Mutex</code> and <code>Lock</code> are used in the example to demonstrate the logic. A sample message is represented by the <code>StringMsg</code> class, and the <code>QueueTest</code> class is used as a test-bed application.</p>

<p>I chose <code>deque</code> as a main building block of the design because it has all necessary operations (including <code>operator[]</code>) to implement <code>PRQueue</code>. In particular, itâ€™s important that the <code>push_back()</code> and <code>pop_front()</code> operations do not invalidate pointers and references to other elements of the <code>deque</code>.</p>

<p>Here is a simple example of how <code>PRQueue</code> can be utilized. Letâ€™s say we need to log a stream of large multi-field messages. Converting numeric fields to text strings is a slow process that is not mission-critical, so we decided to offload this task to dedicated threads that will generate the log.</p>

<p>Initially, the processing diagram may look like figure 1.</p>

<table class="sidebartable">
	<tr>
		<td><img style="max-width:40%" src="http://accu.org/content/images/journals/ol101/Surman/Surman-01.png" /></td>
	</tr>
	<tr>
		<td class="title">Figure 1</td>
	</tr>
</table>

<p>Since the core processing of the messages takes place in multiple threads, the messages may be ready in an order that is different from the original input queue order: if, for example, one thread takes a message off the input queue and goes to sleep, while another thread takes the next message, runs to completion and places the processed message in the output queue, ahead of the first thread. As a result, the log entries may appear out of order. We assume that logging must be done after the messages are processed by the core routines.</p>

<p>Listing 1 is an example illustrating this point. I use the standard STL <code>queue</code> and 3 threads. This generates the output shown in Figure 2.</p>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
...
QueueTest quetest(3);
int i1 =0;
for( int i =10000; i; i--) {
  quetest.push( &quot;| %d&quot;, i1++ );
  quetest.push( &quot;- %d&quot;, i1++ );
}
...
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Listing 1</td>
	</tr>
</table>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
Th# Time-stamp                  Msg#
1:  101108 15:04:49.576167  -   5243
3:  101108 15:04:49.576170  |   5244
1:  101108 15:04:49.576174  -   5245
3:  101108 15:04:49.576177  |   5246
3:  101108 15:04:49.576182  |   5248   // out
2:  101108 15:04:49.571945  |   4338   // of
1:  101108 15:04:49.576179  -   5247   // order
3:  101108 15:04:49.576188  -   5249
2:  101108 15:04:49.576189  |   5250
1:  101108 15:04:49.576191  -   5251
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Figure 2</td>
	</tr>
</table>

<p>Using <code>PRQueue</code> the above scenario will be avoided. It will make sure the order of messages in the output queue matches the order that existed in the input queue, regardless of the order in which the core routines finish processing the messages.</p>

<p>The basic logic behind <code>PRQueue</code> is simple: when the next message is taken off the input queue, still inside the lock, the next push-back position, or â€˜seatâ€™, for the output queue is acquired. The lock is then released and the processing continues. After a message is fully processed the previously acquired position is used to place the message into the output queue.</p>

<p>Figure 3 shows the previous example re-written using <code>PRQueue</code>. The order of the messages in the log is now perfectly preserved.</p>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
Th# Time-stamp                  Msg#
2:  101108 15:04:49.571945  |   4338
...
...
1:  101108 15:04:49.576167  -   5243
3:  101108 15:04:49.576170  |   5244
1:  101108 15:04:49.576174  -   5245
3:  101108 15:04:49.576177  |   5246
1:  101108 15:04:49.576179  -   5247
3:  101108 15:04:49.576182  |   5248
3:  101108 15:04:49.576188  -   5249
2:  101108 15:04:49.576189  |   5250
1:  101108 15:04:49.576191  -   5251
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Figure 3</td>
	</tr>
</table>

<p><code>PRQueue</code> is constructed using two deques: â€˜dataâ€™ and â€˜filledâ€™.</p>

<p>An element of â€˜filledâ€™ <code>deque</code> is an indicator showing that the position is filled with data and can be popped from <code>PRQueue</code>. A wrapper class <code>DataQueue</code> is a holder of â€˜dataâ€™ and â€˜filledâ€™ <code>deque</code>s. The <code>PRQueue</code> methods are for the most part â€˜mutexedâ€™ wrappers of <code>DataQueue</code> methods.</p>

<p>The design allows us to separate/hide thread safety code from the actual implementation, so the user shouldnâ€™t be concerned with writing any locking/unlocking logic.</p>

<p>Letâ€™s discuss <code>PRQueue</code>â€™s functionality in a bit more detail.</p>

<p>The <code>PRQueue</code> <code>pop</code> method does two things: it pops data from the input queue and reserves a push position in the output queue. The <code>push</code> method uses the previously reserved position to save data into the output queue.</p>

<p>For testing <code>PRQueue</code> with multiple threads a function <code>process_msg</code> is executed by every spawned thread. It pops a <code>StringMsg</code> from the input queue, processes the message by calling the <code>StringMsg::process()</code> method, and pushes the message out. (See Listing 2.)</p>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
// The function 'process_msg' is executed by every
// spawned input thread. The signature corresponds
// to the pthread_create 'start_routine' 
// File prqueue.cpp

void* process_msg( void* arg)
{
  int thidx = ++Thidx;
  QueueTest* quetest =(QueueTest*)arg;
  Msg* msg;
  PRQueue&lt; Msg*&gt;::position pos;
  
  cout &lt;&lt; &quot;Input thread=&quot; &lt;&lt; thidx &lt;&lt;
     &quot; started&quot; &lt;&lt; endl;

  for(;;)
  {
    // Wait for the next available message in
    // input queue and pop it up, get the next
    // push position reserved in output queue
    quetest-&gt;input_que.pop(
       msg, quetest-&gt;output_que, pos);
    
      // Process message
      msg-&gt;process( thidx);
      
      // Push processed message into output queue
      // using reserved position
      quetest-&gt;output_que.push( msg, pos);
  }
  return NULL;
}
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Listing 2</td>
	</tr>
</table>

<p>The <code>pop</code> method is not only waiting for the next message to arrive in the input queue, it also checks if the message is ready to be popped by looking at the element of the â€˜filledâ€™ queue. If data is not filled yet, <code>pop</code> will go back to sleep and wait.</p>

<p>Pop logic (Listing 3):</p>

<ul>
	<li>Lock input queue</li>
	<li>If input queue is not empty and top element is filled with data, <code>pop</code> it (otherwise release lock and go to sleep)</li>
	<li>Lock output queue</li>
	<li>Reserve bottom position in output queue.</li>
	<li>Unlock output queue</li>
	<li>Unlock input queue</li>
</ul>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
// Pop data from input queue and reserve position
// in output queue file prqueue.hpp
void PRQueue::pop( DATA&amp; data, PRQueue&amp; outque,
   PRQueue::position&amp; pos)
{
  Lock lk( m_mux);
  
  // Waiting for the message in input queue - pop
  // message
  while( true) {
    if( m_que.pop( data))
      break;
    // either message has not arrived or position
    // is not filled
    wait_while_empty();
  }
  // Reserve position in output queue
  outque.reserve_pos( pos);
}

//
void PRQueue::reserve_pos(
   PRQueue::position&amp; pos) {
  Lock lk( m_mux);
  m_que.reserve( pos);
}
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Listing 3</td>
	</tr>
</table>

<p>The <code>push</code> method copies data to the reserved position of the output queue and sets the â€˜filledâ€™ indicator to true. It also releases threads waiting on a condition variable by sending a notification signal (<span class="filename">prqueue.hpp)</span> â€“ see Listing 4.</p>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
// Push data using reserved position into output
// queue (prqueue.hpp)
void PRQueue::push( const DATA&amp; data,
   const PRQueue::position&amp; pos)
{
  Lock lk( m_mux);
  m_que.fill( data, pos);
  notify_not_empty();
}
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Listing 4</td>
	</tr>
</table>

<p>Now, the messages are arriving in the output queue in order. If we want to extend the chain of our processing conveyor further, another <code>PRQueue</code> can be added to the end. In the test case above we donâ€™t do it: we use a single output thread simply to read processed messages from the output queue and print them out. In that final step, a â€˜simple <code>pop</code>â€™ method was used without its second and third arguments (references to the output queue and position value). See Listing 5.</p>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
// The function 'print_msg' executed by final
// single output thread file prqueue.cpp
void* print_msg( void* arg)
{
  QueueTest* quetest =(QueueTest*)arg;
  Msg* msg;

  cout &lt;&lt; &quot;Output thread started&quot; &lt;&lt; endl;
  for(;;)
  {
    // pop-up message from output queue and print it
    quetest-&gt;output_que.pop( msg);
    msg-&gt;print();
    delete msg;
  }
  return NULL;
}
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Listing 5</td>
	</tr>
</table>

<p>Now, letâ€™s take a look at the auxiliary class <code>DataQueue</code>.</p>

<p>As was mentioned before, <code>DataQueue</code> is a holder of two STL <code>deque</code>s:  â€˜dataâ€™ and â€˜filledâ€™. The <code>DataQueue</code> also defines â€˜structure positionâ€™ and methods where the key steps of position reservation and data popping happen.</p>

<p>The <code>DataQueue</code> is included in <code>PRQueue</code> as a data-member <code>m_que</code> (see   Listing 6).</p>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
// An auxiliary class DataQueue - holder of 'data'
// and 'filled' deques
template&lt; typename DATA&gt; class DataQueue
{

public:
  typedef typename
     deque&lt; DATA&gt;::pointer data_pointer;
  typedef typename
     deque&lt; bool&gt;::pointer filled_pointer;

  // Structure to hold pointers of reserved
  // position
  struct position {
    position() : data_pnt(0), filled_pnt(0) {} 
    data_pointer data_pnt;
    filled_pointer filled_pnt;
  };

  // Check if data deque is not empty and front
  // element is 'filled'.
  // Copy front data out, pop-up front elements
  // of both deques
  bool pop( DATA&amp; out) {
    if( m_data_que.empty() || 
       ! m_filled_que.front())
      return false;
    out = m_data_que.front();
    m_data_que.pop_front();
    m_filled_que.pop_front();
    return true;
  }
  // Add dummy elements to the back of both
  // deques.
  // Save pointers of both elements to the output
  // position
  void reserve( position&amp; pos) {
    m_data_que.push_back( m_dummy);
    m_filled_que.push_back( false);
    pos.data_pnt =
       &amp;m_data_que[ m_data_que.size() -1];
    pos.filled_pnt =
       &amp;m_filled_que[ m_filled_que.size() -1];
  }
  // Copy data and set 'filled' indicator by
  // position
  void fill( const DATA&amp; data,
     const position&amp; pos) {
    *pos.data_pnt = data;
    *pos.filled_pnt = true;
  }
  void push( const DATA&amp; data) {
    m_data_que.push_back( data);
    m_filled_que.push_back( true);
  }

private : 
  deque&lt;DATA&gt; m_data_que;
  deque&lt;bool&gt; m_filled_que;
  DATA m_dummy;
};//DataQueue
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Listing 6</td>
	</tr>
</table>

<p>To compile and run <code>PRQueue</code> test, use the commands in Figure 4.</p>

<table class="sidebartable">
	<tr>
		<td>
			<pre class="programlisting">
   c++ -I. prqueue.cpp -lpthread                # PRQueue test
   c++ -I. prqueue.cpp -lpthread -DSIMPLE_QUE   # SimpleQueue test
   
   # Try long message
   c++ -I. prqueue.cpp -lpthread -DLONG_MSG
   c++ -I. prqueue.cpp -lpthread -DLONG_MSG -DSIMPLE_QUE

   a.out [number-of-messages]
			</pre>
		</td>
	</tr>
	<tr>
		<td class="title">Figure 4</td>
	</tr>
</table>

<h2>Conclusion</h2>

<p>The queue with Position Reservation (<code>PRQueue</code>) presented here could be useful in multi-threaded applications when the order of streaming messages should be preserved. <code>PRQueue</code> will make sure that the order of messages in the output queue exactly matches the order that existed in the input queue, because the next push-back position in the output queue is reserved synchronously with taking the message off the input queue. The reserved spot is later filled with data when the message is done processing and ready.</p>

<h2>Reference</h2>

<p>A zip file containing the code is available at:<a href="http://accu.org/content/journals/ol101/prqueue.zip">http://accu.org/content/journals/ol101/prqueue.zip</a></p>
</p>
<p><strong>Notes:</strong>&nbsp;</p>
<p><em>More fields may be available via dynamicdata ..</em></p>
</div>
</channel>
</rss>
