SHOGUN
v2.0.0
|
00001 /* 00002 * This program is free software; you can redistribute it and/or modify 00003 * it under the terms of the GNU General Public License as published by 00004 * the Free Software Foundation; either version 3 of the License, or 00005 * (at your option) any later version. 00006 * 00007 * Written (W) 2011 Shashwat Lal Das 00008 * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society 00009 */ 00010 00011 #include <shogun/lib/common.h> 00012 #include <shogun/lib/DataType.h> 00013 #include <pthread.h> 00014 00015 #ifndef __PARSEBUFFER_H__ 00016 #define __PARSEBUFFER_H__ 00017 00018 namespace shogun 00019 { 00020 00023 enum E_IS_EXAMPLE_USED 00024 { 00025 E_EMPTY = 1, 00026 E_NOT_USED = 2, 00027 E_USED = 3 00028 }; 00029 00039 template <class T> 00040 class Example 00041 { 00042 public: 00044 float64_t label; 00046 SGVector<T> fv; 00047 }; 00048 00065 template <class T> class CParseBuffer: public CSGObject 00066 { 00067 public: 00073 CParseBuffer(int32_t size = 1024); 00074 00079 ~CParseBuffer(); 00080 00087 Example<T>* get_free_example() 00088 { 00089 pthread_mutex_lock(write_lock); 00090 pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]); 00091 while (ex_used[ex_write_index] == E_NOT_USED) 00092 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]); 00093 Example<T>* ex=&ex_ring[ex_write_index]; 00094 pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]); 00095 pthread_mutex_unlock(write_lock); 00096 00097 return ex; 00098 } 00099 00108 int32_t write_example(Example<T>* ex); 00109 00115 Example<T>* return_example_to_read(); 00116 00122 Example<T>* get_unused_example(); 00123 00132 int32_t copy_example(Example<T>* ex); 00133 00141 void finalize_example(bool free_after_release); 00142 00152 void set_free_vectors_on_destruct(bool destroy) { free_vectors_on_destruct = destroy; } 00153 00158 bool get_free_vectors_on_destruct() { return free_vectors_on_destruct; } 00159 00165 inline virtual const char* get_name() const { return "ParseBuffer"; } 00166 00167 protected: 00172 inline virtual void inc_read_index() 00173 { 00174 ex_read_index=(ex_read_index + 1) % ring_size; 00175 } 00176 00181 inline virtual void inc_write_index() 00182 { 00183 ex_write_index=(ex_write_index + 1) % ring_size; 00184 } 00185 00186 protected: 00187 00189 int32_t ring_size; 00191 Example<T>* ex_ring; 00192 00194 E_IS_EXAMPLE_USED* ex_used; 00196 pthread_mutex_t* ex_in_use_mutex; 00198 pthread_cond_t* ex_in_use_cond; 00200 pthread_mutex_t* read_lock; 00202 pthread_mutex_t* write_lock; 00203 00205 int32_t ex_write_index; 00207 int32_t ex_read_index; 00208 00210 bool free_vectors_on_destruct; 00211 }; 00212 00213 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size) 00214 { 00215 ring_size = size; 00216 ex_ring = SG_CALLOC(Example<T>, ring_size); 00217 ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size); 00218 ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size); 00219 ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size); 00220 read_lock = new pthread_mutex_t; 00221 write_lock = new pthread_mutex_t; 00222 00223 SG_SINFO("Initialized with ring size: %d.\n", ring_size); 00224 00225 ex_write_index = 0; 00226 ex_read_index = 0; 00227 00228 for (int32_t i=0; i<ring_size; i++) 00229 { 00230 ex_used[i] = E_EMPTY; 00231 ex_ring[i].fv.vector = new T(); 00232 ex_ring[i].fv.vlen = 1; 00233 ex_ring[i].label = FLT_MAX; 00234 00235 pthread_cond_init(&ex_in_use_cond[i], NULL); 00236 pthread_mutex_init(&ex_in_use_mutex[i], NULL); 00237 } 00238 pthread_mutex_init(read_lock, NULL); 00239 pthread_mutex_init(write_lock, NULL); 00240 00241 free_vectors_on_destruct = true; 00242 } 00243 00244 template <class T> CParseBuffer<T>::~CParseBuffer() 00245 { 00246 for (int32_t i=0; i<ring_size; i++) 00247 { 00248 if (ex_ring[i].fv.vector != NULL && free_vectors_on_destruct) 00249 delete ex_ring[i].fv.vector; 00250 pthread_mutex_destroy(&ex_in_use_mutex[i]); 00251 pthread_cond_destroy(&ex_in_use_cond[i]); 00252 } 00253 SG_FREE(ex_ring); 00254 SG_FREE(ex_used); 00255 SG_FREE(ex_in_use_mutex); 00256 SG_FREE(ex_in_use_cond); 00257 00258 delete read_lock; 00259 delete write_lock; 00260 } 00261 00262 template <class T> 00263 int32_t CParseBuffer<T>::write_example(Example<T> *ex) 00264 { 00265 ex_ring[ex_write_index].label = ex->label; 00266 ex_ring[ex_write_index].fv.vector = ex->fv.vector; 00267 ex_ring[ex_write_index].fv.vlen = ex->fv.vlen; 00268 ex_used[ex_write_index] = E_NOT_USED; 00269 inc_write_index(); 00270 00271 return 1; 00272 } 00273 00274 template <class T> 00275 Example<T>* CParseBuffer<T>::return_example_to_read() 00276 { 00277 if (ex_read_index >= 0) 00278 return &ex_ring[ex_read_index]; 00279 else 00280 return NULL; 00281 } 00282 00283 template <class T> 00284 Example<T>* CParseBuffer<T>::get_unused_example() 00285 { 00286 pthread_mutex_lock(read_lock); 00287 00288 Example<T> *ex; 00289 int32_t current_index = ex_read_index; 00290 // Because read index will change after return_example_to_read 00291 00292 pthread_mutex_lock(&ex_in_use_mutex[current_index]); 00293 00294 if (ex_used[current_index] == E_NOT_USED) 00295 ex = return_example_to_read(); 00296 else 00297 ex = NULL; 00298 00299 pthread_mutex_unlock(&ex_in_use_mutex[current_index]); 00300 00301 pthread_mutex_unlock(read_lock); 00302 return ex; 00303 } 00304 00305 template <class T> 00306 int32_t CParseBuffer<T>::copy_example(Example<T> *ex) 00307 { 00308 pthread_mutex_lock(write_lock); 00309 int32_t ret; 00310 int32_t current_index = ex_write_index; 00311 00312 pthread_mutex_lock(&ex_in_use_mutex[current_index]); 00313 while (ex_used[ex_write_index] == E_NOT_USED) 00314 { 00315 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]); 00316 } 00317 00318 ret = write_example(ex); 00319 00320 pthread_mutex_unlock(&ex_in_use_mutex[current_index]); 00321 pthread_mutex_unlock(write_lock); 00322 00323 return ret; 00324 } 00325 00326 template <class T> 00327 void CParseBuffer<T>::finalize_example(bool free_after_release) 00328 { 00329 pthread_mutex_lock(read_lock); 00330 pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]); 00331 ex_used[ex_read_index] = E_USED; 00332 00333 if (free_after_release) 00334 { 00335 SG_DEBUG("Freeing object in ring at index %d and address: %p.\n", 00336 ex_read_index, ex_ring[ex_read_index].fv.vector); 00337 00338 delete ex_ring[ex_read_index].fv.vector; 00339 ex_ring[ex_read_index].fv.vector=NULL; 00340 } 00341 00342 pthread_cond_signal(&ex_in_use_cond[ex_read_index]); 00343 pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]); 00344 inc_read_index(); 00345 00346 pthread_mutex_unlock(read_lock); 00347 } 00348 00349 } 00350 #endif // __PARSEBUFFER_H__