SHOGUN
v2.0.0
|
00001 /* 00002 * This program is free software; you can redistribute it and/or modify 00003 * it under the terms of the GNU General Public License as published by 00004 * the Free Software Foundation; either version 3 of the License, or 00005 * (at your option) any later version. 00006 * 00007 * Written (W) 2011 Shashwat Lal Das 00008 * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society 00009 */ 00010 00011 #ifndef __INPUTPARSER_H__ 00012 #define __INPUTPARSER_H__ 00013 00014 #include <shogun/io/SGIO.h> 00015 #include <shogun/io/streaming/StreamingFile.h> 00016 #include <shogun/lib/common.h> 00017 #include <shogun/io/streaming/ParseBuffer.h> 00018 #include <pthread.h> 00019 00020 #define PARSER_DEFAULT_BUFFSIZE 100 00021 00022 namespace shogun 00023 { 00026 enum E_EXAMPLE_TYPE 00027 { 00028 E_LABELLED = 1, 00029 E_UNLABELLED = 2 00030 }; 00031 00080 template <class T> class CInputParser 00081 { 00082 public: 00083 00088 CInputParser(); 00089 00094 ~CInputParser(); 00095 00107 void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE); 00108 00114 bool is_running(); 00115 00122 int32_t get_number_of_features() { return number_of_features; } 00123 00135 void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len)); 00136 00148 void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label)); 00149 00161 int32_t get_vector_and_label(T* &feature_vector, 00162 int32_t &length, 00163 float64_t &label); 00164 00175 int32_t get_vector_only(T* &feature_vector, int32_t &length); 00176 00183 void set_free_vector_after_release(bool free_vec); 00184 00191 void set_free_vectors_on_destruct(bool destroy); 00192 00198 void start_parser(); 00199 00208 void* main_parse_loop(void* params); 00209 00210 00216 void copy_example_into_buffer(Example<T>* ex); 00217 00224 Example<T>* retrieve_example(); 00225 00238 int32_t get_next_example(T* &feature_vector, 00239 int32_t &length, 00240 float64_t &label); 00241 00250 int32_t get_next_example(T* &feature_vector, 00251 int32_t &length); 00252 00260 void finalize_example(); 00261 00266 void end_parser(); 00267 00270 void exit_parser(); 00271 00277 int32_t get_ring_size() { return ring_size; } 00278 00279 private: 00287 static void* parse_loop_entry_point(void* params); 00288 00289 public: 00290 bool parsing_done; 00291 bool reading_done; 00293 E_EXAMPLE_TYPE example_type; 00295 protected: 00302 void (CStreamingFile::*read_vector) (T* &vec, int32_t &len); 00303 00310 void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label); 00311 00313 CStreamingFile* input_source; 00314 00316 pthread_t parse_thread; 00317 00319 CParseBuffer<T>* examples_ring; 00320 00322 int32_t number_of_features; 00323 00325 int32_t number_of_vectors_parsed; 00326 00328 int32_t number_of_vectors_read; 00329 00331 Example<T>* current_example; 00332 00334 T* current_feature_vector; 00335 00337 float64_t current_label; 00338 00340 int32_t current_len; 00341 00343 bool free_after_release; 00344 00346 int32_t ring_size; 00347 00349 pthread_mutex_t examples_state_lock; 00350 00352 pthread_cond_t examples_state_changed; 00353 }; 00354 00355 template <class T> 00356 void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len)) 00357 { 00358 // Set read_vector to point to the function passed as arg 00359 read_vector=func_ptr; 00360 } 00361 00362 template <class T> 00363 void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label)) 00364 { 00365 // Set read_vector_and_label to point to the function passed as arg 00366 read_vector_and_label=func_ptr; 00367 } 00368 00369 template <class T> 00370 CInputParser<T>::CInputParser() 00371 { 00372 //init(NULL, true, PARSER_DEFAULT_BUFFSIZE); 00373 } 00374 00375 template <class T> 00376 CInputParser<T>::~CInputParser() 00377 { 00378 end_parser(); 00379 00380 pthread_mutex_destroy(&examples_state_lock); 00381 pthread_cond_destroy(&examples_state_changed); 00382 00383 delete examples_ring; 00384 } 00385 00386 template <class T> 00387 void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size) 00388 { 00389 input_source = input_file; 00390 00391 if (is_labelled == true) 00392 example_type = E_LABELLED; 00393 else 00394 example_type = E_UNLABELLED; 00395 00396 examples_ring = new CParseBuffer<T>(size); 00397 00398 parsing_done = false; 00399 reading_done = false; 00400 number_of_vectors_parsed = 0; 00401 number_of_vectors_read = 0; 00402 00403 current_len = -1; 00404 current_label = -1; 00405 current_feature_vector = NULL; 00406 00407 free_after_release=true; 00408 ring_size=size; 00409 00410 pthread_mutex_init(&examples_state_lock, NULL); 00411 pthread_cond_init(&examples_state_changed, NULL); 00412 } 00413 00414 template <class T> 00415 void CInputParser<T>::set_free_vector_after_release(bool free_vec) 00416 { 00417 free_after_release=free_vec; 00418 } 00419 00420 template <class T> 00421 void CInputParser<T>::set_free_vectors_on_destruct(bool destroy) 00422 { 00423 examples_ring->set_free_vectors_on_destruct(destroy); 00424 } 00425 00426 template <class T> 00427 void CInputParser<T>::start_parser() 00428 { 00429 if (is_running()) 00430 { 00431 SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n"); 00432 } 00433 00434 pthread_create(&parse_thread, NULL, parse_loop_entry_point, this); 00435 } 00436 00437 template <class T> 00438 void* CInputParser<T>::parse_loop_entry_point(void* params) 00439 { 00440 ((CInputParser *) params)->main_parse_loop(params); 00441 00442 return NULL; 00443 } 00444 00445 template <class T> 00446 bool CInputParser<T>::is_running() 00447 { 00448 bool ret; 00449 00450 pthread_mutex_lock(&examples_state_lock); 00451 00452 if (parsing_done) 00453 if (reading_done) 00454 ret = false; 00455 else 00456 ret = true; 00457 else 00458 ret = false; 00459 00460 pthread_mutex_unlock(&examples_state_lock); 00461 return ret; 00462 } 00463 00464 template <class T> 00465 int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector, 00466 int32_t &length, 00467 float64_t &label) 00468 { 00469 (input_source->*read_vector_and_label)(feature_vector, length, label); 00470 00471 if (length < 1) 00472 { 00473 // Problem reading the example 00474 return 0; 00475 } 00476 00477 return 1; 00478 } 00479 00480 template <class T> 00481 int32_t CInputParser<T>::get_vector_only(T* &feature_vector, 00482 int32_t &length) 00483 { 00484 (input_source->*read_vector)(feature_vector, length); 00485 00486 if (length < 1) 00487 { 00488 // Problem reading the example 00489 return 0; 00490 } 00491 00492 return 1; 00493 } 00494 00495 template <class T> 00496 void CInputParser<T>::copy_example_into_buffer(Example<T>* ex) 00497 { 00498 examples_ring->copy_example(ex); 00499 } 00500 00501 template <class T> void* CInputParser<T>::main_parse_loop(void* params) 00502 { 00503 // Read the examples into current_* objects 00504 // Instead of allocating mem for new objects each time 00505 #ifdef HAVE_PTHREAD 00506 CInputParser* this_obj = (CInputParser *) params; 00507 this->input_source = this_obj->input_source; 00508 00509 while (1) 00510 { 00511 pthread_mutex_lock(&examples_state_lock); 00512 if (parsing_done) 00513 { 00514 pthread_mutex_unlock(&examples_state_lock); 00515 return NULL; 00516 } 00517 pthread_mutex_unlock(&examples_state_lock); 00518 00519 pthread_testcancel(); 00520 00521 current_example = examples_ring->get_free_example(); 00522 current_feature_vector = current_example->fv.vector; 00523 current_len = current_example->fv.vlen; 00524 current_label = current_example->label; 00525 00526 if (example_type == E_LABELLED) 00527 get_vector_and_label(current_feature_vector, current_len, current_label); 00528 else 00529 get_vector_only(current_feature_vector, current_len); 00530 00531 if (current_len < 0) 00532 { 00533 pthread_mutex_lock(&examples_state_lock); 00534 parsing_done = true; 00535 pthread_cond_signal(&examples_state_changed); 00536 pthread_mutex_unlock(&examples_state_lock); 00537 return NULL; 00538 } 00539 00540 current_example->label = current_label; 00541 current_example->fv.vector = current_feature_vector; 00542 current_example->fv.vlen = current_len; 00543 00544 examples_ring->copy_example(current_example); 00545 00546 pthread_mutex_lock(&examples_state_lock); 00547 number_of_vectors_parsed++; 00548 pthread_cond_signal(&examples_state_changed); 00549 pthread_mutex_unlock(&examples_state_lock); 00550 } 00551 #endif /* HAVE_PTHREAD */ 00552 return NULL; 00553 } 00554 00555 template <class T> Example<T>* CInputParser<T>::retrieve_example() 00556 { 00557 /* This function should be guarded by mutexes while calling */ 00558 Example<T> *ex; 00559 00560 if (parsing_done) 00561 { 00562 if (number_of_vectors_read == number_of_vectors_parsed) 00563 { 00564 reading_done = true; 00565 /* Signal to waiting threads that no more examples are left */ 00566 pthread_cond_signal(&examples_state_changed); 00567 return NULL; 00568 } 00569 } 00570 00571 if (number_of_vectors_parsed <= 0) 00572 return NULL; 00573 00574 if (number_of_vectors_read == number_of_vectors_parsed) 00575 { 00576 return NULL; 00577 } 00578 00579 ex = examples_ring->get_unused_example(); 00580 number_of_vectors_read++; 00581 00582 return ex; 00583 } 00584 00585 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv, 00586 int32_t &length, float64_t &label) 00587 { 00588 /* if reading is done, no more examples can be fetched. return 0 00589 else, if example can be read, get the example and return 1. 00590 otherwise, wait for further parsing, get the example and 00591 return 1 */ 00592 00593 Example<T> *ex; 00594 00595 while (1) 00596 { 00597 if (reading_done) 00598 return 0; 00599 00600 pthread_mutex_lock(&examples_state_lock); 00601 ex = retrieve_example(); 00602 00603 if (ex == NULL) 00604 { 00605 if (reading_done) 00606 { 00607 /* No more examples left, return */ 00608 pthread_mutex_unlock(&examples_state_lock); 00609 return 0; 00610 } 00611 else 00612 { 00613 /* Examples left, wait for one to become ready */ 00614 pthread_cond_wait(&examples_state_changed, &examples_state_lock); 00615 pthread_mutex_unlock(&examples_state_lock); 00616 continue; 00617 } 00618 } 00619 else 00620 { 00621 /* Example ready, return the example */ 00622 pthread_mutex_unlock(&examples_state_lock); 00623 break; 00624 } 00625 } 00626 00627 fv = ex->fv.vector; 00628 length = ex->fv.vlen; 00629 label = ex->label; 00630 00631 return 1; 00632 } 00633 00634 template <class T> 00635 int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length) 00636 { 00637 float64_t label_dummy; 00638 00639 return get_next_example(fv, length, label_dummy); 00640 } 00641 00642 template <class T> 00643 void CInputParser<T>::finalize_example() 00644 { 00645 examples_ring->finalize_example(free_after_release); 00646 } 00647 00648 template <class T> void CInputParser<T>::end_parser() 00649 { 00650 pthread_join(parse_thread, NULL); 00651 } 00652 00653 template <class T> void CInputParser<T>::exit_parser() 00654 { 00655 pthread_cancel(parse_thread); 00656 } 00657 } 00658 #endif // __INPUTPARSER_H__