00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef ASQL_HPP
00022 #define ASQL_HPP
00023
00024 #include <vector>
00025 #include <queue>
00026 #include <cstring>
00027
00028 #include <boost/date_time/posix_time/posix_time.hpp>
00029 #include <boost/shared_ptr.hpp>
00030 #include <boost/shared_array.hpp>
00031 #include <boost/scoped_array.hpp>
00032 #include <boost/function.hpp>
00033 #include <boost/bind.hpp>
00034 #include <boost/thread.hpp>
00035
00037 namespace ASql
00038 {
00042 struct Error: public std::exception
00043 {
00047 int erno;
00051 const char* msg;
00056 Error(const char* msg_, const int erno_): erno(erno_), msg(msg_) {}
00057 Error(): erno(0), msg(0) {}
00058 const char* what() const throw() { return msg; }
00059
00060 Error(const Error& e): erno(e.erno), msg(e.msg) {}
00061 };
00062
00064 namespace Data
00065 {
00073 enum Type { U_TINY=0,
00074 U_SHORT,
00075 U_INT,
00076 U_BIGINT,
00077 TINY,
00078 SHORT,
00079 INT,
00080 BIGINT,
00081 FLOAT,
00082 DOUBLE,
00083 TIME,
00084 DATE,
00085 DATETIME,
00086 BLOB,
00087 TEXT,
00088 WTEXT,
00089 CHAR,
00090 BINARY,
00091 BIT,
00092 U_TINY_N,
00093 U_SHORT_N,
00094 U_INT_N,
00095 U_BIGINT_N,
00096 TINY_N,
00097 SHORT_N,
00098 INT_N,
00099 BIGINT_N,
00100 FLOAT_N,
00101 DOUBLE_N,
00102 TIME_N,
00103 DATE_N,
00104 DATETIME_N,
00105 BLOB_N,
00106 TEXT_N,
00107 WTEXT_N,
00108 CHAR_N,
00109 BINARY_N,
00110 BIT_N,
00111 NOTHING };
00112
00121 struct NullablePar
00122 {
00123 NullablePar(bool _nullness): nullness(_nullness) { }
00124 bool nullness;
00130 virtual void* getVoid() =0;
00131 };
00132
00137 template<class T> struct Nullable: public NullablePar
00138 {
00139 T object;
00140 void* getVoid() { return &object; }
00141 operator T() { return object; }
00142 operator const T() const { return object; }
00143 Nullable(): NullablePar(false) {}
00144 Nullable(const T& x): NullablePar(false), object(x) { }
00145 };
00146
00150 template<class T, int size> struct NullableArray: public NullablePar
00151 {
00152 T object[size];
00153 void* getVoid() { return object; }
00154 operator T*() { return object; }
00155 NullableArray(): NullablePar(false) {}
00156 NullableArray(const T& x): NullablePar(false), object(x) { }
00157 };
00158
00162 template<class charT, class Traits, class T> inline std::basic_ostream<charT, Traits>& operator<<(std::basic_ostream<charT, Traits>& os, const Nullable<T>& x)
00163 {
00164 if(x.nullness)
00165 os << "NULL";
00166 else
00167 os << x.object;
00168
00169 return os;
00170 }
00171
00172 typedef unsigned char Utiny;
00173 typedef signed char Tiny;
00174 typedef unsigned short int Ushort;
00175 typedef short int Short;
00176 typedef unsigned int Uint;
00177 typedef int Int;
00178 typedef unsigned long long int Ubigint;
00179 typedef long long int Bigint;
00180 typedef float Float;
00181 typedef double Double;
00182 typedef boost::posix_time::time_duration Time;
00183 typedef boost::gregorian::date Date;
00184 typedef boost::posix_time::ptime Datetime;
00185 typedef std::vector<char> Blob;
00186 typedef std::string Text;
00187 typedef std::wstring Wtext;
00188
00189
00190 typedef Nullable<unsigned char> UtinyN;
00191 typedef Nullable<char> TinyN;
00192 typedef Nullable<unsigned short int> UshortN;
00193 typedef Nullable<short int> ShortN;
00194 typedef Nullable<unsigned int> UintN;
00195 typedef Nullable<int> IntN;
00196 typedef Nullable<unsigned long long int> UbigintN;
00197 typedef Nullable<long long int> BigintN;
00198 typedef Nullable<float> FloatN;
00199 typedef Nullable<double> DoubleN;
00200 typedef Nullable<boost::posix_time::time_duration> TimeN;
00201 typedef Nullable<boost::gregorian::date> DateN;
00202 typedef Nullable<boost::posix_time::ptime> DatetimeN;
00203 typedef Nullable<std::vector<char> > BlobN;
00204 typedef Nullable<std::string> TextN;
00205 typedef Nullable<std::wstring> WtextN;
00206
00207
00214 struct Index
00215 {
00216 Type type;
00217 void* data;
00218 size_t size;
00219
00220 Index(const Utiny& x): type(U_TINY), data(const_cast<Utiny*>(&x)) { }
00221 Index(const Tiny& x): type(TINY), data(const_cast<Tiny*>(&x)) { }
00222 Index(const Ushort& x): type(U_SHORT), data(const_cast<Ushort*>(&x)) { }
00223 Index(const Short& x): type(SHORT), data(const_cast<Short*>(&x)) { }
00224 Index(const Uint& x): type(U_INT), data(const_cast<Uint*>(&x)) { }
00225 Index(const Int& x): type(INT), data(const_cast<Int*>(&x)) { }
00226 Index(const Ubigint& x): type(U_BIGINT), data(const_cast<Ubigint*>(&x)) { }
00227 Index(const Bigint& x): type(BIGINT), data(const_cast<Bigint*>(&x)) { }
00228 Index(const Float& x): type(FLOAT), data(const_cast<Float*>(&x)) { }
00229 Index(const Double& x): type(DOUBLE), data(const_cast<Double*>(&x)) { }
00230 Index(const Time& x): type(TIME), data(const_cast<Time*>(&x)) { }
00231 Index(const Date& x): type(DATE), data(const_cast<Date*>(&x)) { }
00232 Index(const Datetime& x): type(DATETIME), data(const_cast<Datetime*>(&x)) { }
00233 Index(const Blob& x): type(BLOB), data(const_cast<Blob*>(&x)) { }
00234 Index(const Text& x): type(TEXT), data(const_cast<Text*>(&x)) { }
00235 Index(const Wtext& x): type(WTEXT), data(const_cast<Wtext*>(&x)) { }
00236 Index(const char* const x, const size_t size_): type(CHAR), data(const_cast<char*>(x)), size(size_) { }
00237 template<class T> explicit Index(const T& x): type(BINARY), data(const_cast<T*>(&x)), size(sizeof(T)) { }
00238 Index(const UtinyN& x): type(U_TINY_N), data(const_cast<UtinyN*>(&x)) { }
00239 Index(const TinyN& x): type(TINY_N), data(const_cast<TinyN*>(&x)) { }
00240 Index(const UshortN& x): type(U_SHORT_N), data(const_cast<UshortN*>(&x)) { }
00241 Index(const ShortN& x): type(SHORT_N), data(const_cast<ShortN*>(&x)) { }
00242 Index(const UintN& x): type(U_INT_N), data(const_cast<UintN*>(&x)) { }
00243 Index(const IntN& x): type(INT_N), data(const_cast<IntN*>(&x)) { }
00244 Index(const UbigintN& x): type(U_BIGINT_N), data(const_cast<UbigintN*>(&x)) { }
00245 Index(const BigintN& x): type(BIGINT_N), data(const_cast<BigintN*>(&x)) { }
00246 Index(const FloatN& x): type(FLOAT_N), data(const_cast<FloatN*>(&x)) { }
00247 Index(const DoubleN& x): type(DOUBLE_N), data(const_cast<DoubleN*>(&x)) { }
00248 Index(const TimeN& x): type(TIME_N), data(const_cast<TimeN*>(&x)) { }
00249 Index(const DateN& x): type(DATE_N), data(const_cast<DateN*>(&x)) { }
00250 Index(const DatetimeN& x): type(DATETIME_N), data(const_cast<DatetimeN*>(&x)) { }
00251 Index(const BlobN& x): type(BLOB_N), data(const_cast<BlobN*>(&x)) { }
00252 Index(const TextN& x): type(TEXT_N), data(const_cast<TextN*>(&x)) { }
00253 Index(const WtextN& x): type(WTEXT_N), data(const_cast<WtextN*>(&x)) { }
00254 template<int size_> Index(const NullableArray<char, size_>& x): type(CHAR_N), data(const_cast<NullableArray<char, size_>*>(&x)), size(size_) { }
00255 template<class T> explicit Index(const Nullable<T>& x): type(BINARY_N), data(const_cast<Nullable<T>*>(&x)), size(sizeof(T)) { }
00256
00257 Index(const Index& x): type(x.type), data(x.data), size(x.size) {}
00258 Index(): type(NOTHING), data(0), size(0) {}
00259
00260 const Index& operator=(const Index& x) { type=x.type; data=x.data; size=x.size; return *this; }
00261 bool operator==(const Index& x) { return type==x.type && data==x.data && size==x.size; }
00262 };
00263
00331 struct Set
00332 {
00338 virtual size_t numberOfSqlElements() const =0;
00339
00350 virtual Index getSqlIndex(const size_t index) const =0;
00351
00352 virtual ~Set() {}
00353 };
00354
00361 template<class T> class SetBuilder: public Set
00362 {
00363 public:
00367 T data;
00368 private:
00372 virtual size_t numberOfSqlElements() const { return data.numberOfSqlElements(); }
00376 virtual Index getSqlIndex(const size_t index) const { return data.getSqlIndex(index); }
00377 };
00378
00385 template<class T> class SetRefBuilder: public Set
00386 {
00390 virtual size_t numberOfSqlElements() const { return m_data.numberOfSqlElements(); }
00394 virtual Index getSqlIndex(const size_t index) const { return m_data.getSqlIndex(index); }
00398 const T& m_data;
00399 public:
00403 inline SetRefBuilder(const T& x): m_data(x) {}
00404 };
00405
00415 template<class T> class SetPtrBuilder: public Set
00416 {
00420 const T* m_data;
00424 virtual size_t numberOfSqlElements() const { return m_data->numberOfSqlElements(); }
00428 virtual Index getSqlIndex(const size_t index) const { return m_data->getSqlIndex(index); }
00429 public:
00433 inline SetPtrBuilder(): m_data(0) {}
00437 inline SetPtrBuilder(const T& x): m_data(&x) {}
00438 inline SetPtrBuilder(SetPtrBuilder& x): m_data(x.m_data) {}
00442 inline void set(const T& data) { m_data=&data; }
00446 inline void clear() { m_data=0; }
00450 operator bool() const { return m_data; }
00451 };
00452
00462 template<class T> class SetSharedPtrBuilder: public Set
00463 {
00467 virtual size_t numberOfSqlElements() const { return data->numberOfSqlElements(); }
00471 public:
00472 virtual Index getSqlIndex(const size_t index) const { return data->getSqlIndex(index); }
00473 inline SetSharedPtrBuilder() {}
00474 inline SetSharedPtrBuilder(const boost::shared_ptr<T>& x): data(x) {}
00475 inline SetSharedPtrBuilder(SetSharedPtrBuilder& x): data(x.data) {}
00479 boost::shared_ptr<T> data;
00480 };
00481
00487 template<class T> class IndySetBuilder: public Set
00488 {
00489 public:
00493 T data;
00494 private:
00498 virtual size_t numberOfSqlElements() const { return 1; }
00502 virtual Index getSqlIndex(const size_t index) const { return data; }
00503 };
00504
00510 template<class T> class IndySetRefBuilder: public Set
00511 {
00515 const T& data;
00519 virtual size_t numberOfSqlElements() const { return 1; }
00523 virtual Index getSqlIndex(const size_t index) const { return data; }
00524 public:
00528 inline IndySetRefBuilder(const T& x): data(x) {}
00529 };
00530
00534 struct SetContainer
00535 {
00539 virtual Set& manufacture() =0;
00543 virtual void trim() =0;
00544 virtual ~SetContainer() {}
00550 virtual const Set* pull() const =0;
00551 };
00552
00564 template<class T> class STLSetContainer: public SetContainer
00565 {
00566 mutable SetPtrBuilder<typename T::value_type> m_buffer;
00567 mutable typename T::iterator m_itBuffer;
00568
00569 Set& manufacture()
00570 {
00571 data.push_back(typename T::value_type());
00572 m_buffer.set(data.back());
00573 return m_buffer;
00574 }
00575 void trim() { data.pop_back(); }
00576 const Set* pull() const
00577 {
00578 m_buffer.set(*m_itBuffer++);
00579 return &m_buffer;
00580 }
00581 public:
00585 T data;
00586 STLSetContainer(): m_itBuffer(data.begin()) {}
00587 };
00588
00600 template<class T> class STLSetRefContainer: public SetContainer
00601 {
00602 T& data;
00603 mutable SetPtrBuilder<typename T::value_type> m_buffer;
00604 mutable typename T::iterator m_itBuffer;
00605
00606 Set& manufacture()
00607 {
00608 data.push_back(typename T::value_type());
00609 m_buffer.set(data.back());
00610 return m_buffer;
00611 }
00612 void trim() { data.pop_back(); }
00613 const Set* pull() const
00614 {
00615 m_buffer.set(*m_itBuffer++);
00616 return &m_buffer;
00617 }
00618 public:
00622 STLSetRefContainer(T& x): data(x), m_itBuffer(data.begin()) {}
00623 };
00624
00636 template<class T> class STLSharedSetContainer: public SetContainer
00637 {
00638 mutable SetPtrBuilder<typename T::value_type> m_buffer;
00639 mutable typename T::iterator m_itBuffer;
00640
00641 Set& manufacture()
00642 {
00643 data->push_back(typename T::value_type());
00644 m_buffer.set(data->back());
00645 return m_buffer;
00646 }
00647 void trim() { data->pop_back(); }
00648 const Set* pull() const
00649 {
00650 m_buffer.set(*m_itBuffer++);
00651 return &m_buffer;
00652 }
00653 public:
00657 boost::shared_ptr<T> data;
00658 STLSharedSetContainer(const boost::shared_ptr<T>& x): data(x) {}
00659 STLSharedSetContainer(): m_itBuffer(data->begin()) {}
00660 };
00661
00665 struct Conversion
00666 {
00672 virtual void* getPointer() =0;
00673
00677 virtual void convertResult() =0;
00678
00682 virtual void convertParam() =0;
00683
00687 void* external;
00688 };
00689
00690 typedef std::map<int, boost::shared_ptr<Conversion> > Conversions;
00691 }
00692
00705 class Query
00706 {
00707 private:
00711 struct SharedData
00712 {
00721 enum Flags { FLAG_SINGLE_PARAMETERS=1, FLAG_SINGLE_RESULTS=1<<1, FLAG_NO_MANAGE_RESULTS=1<<2, FLAG_NO_MANAGE_PARAMETERS=1<<3 };
00722 SharedData(): m_parameters(0), m_results(0), m_insertId(0), m_rows(0), m_cancel(false), m_flags(FLAG_SINGLE_PARAMETERS) {}
00728 ~SharedData()
00729 {
00730 delete m_rows;
00731 delete m_insertId;
00732 destroyResults();
00733 destroyParameters();
00734 }
00742 void* m_parameters;
00749 void* m_results;
00753 unsigned long long int* m_insertId;
00757 unsigned long long int* m_rows;
00761 Error m_error;
00765 boost::function<void()> m_callback;
00766 boost::mutex m_callbackMutex;
00770 bool m_cancel;
00788 unsigned char m_flags;
00789
00796 void destroyResults()
00797 {
00798 if(m_flags&FLAG_NO_MANAGE_RESULTS) return;
00799 if(m_flags&FLAG_SINGLE_RESULTS)
00800 delete static_cast<Data::Set*>(m_results);
00801 else
00802 delete static_cast<Data::SetContainer*>(m_results);
00803 }
00804
00811 void destroyParameters()
00812 {
00813 if(m_flags&FLAG_NO_MANAGE_PARAMETERS) return;
00814 if(m_flags&FLAG_SINGLE_PARAMETERS)
00815 delete static_cast<Data::Set*>(m_parameters);
00816 else
00817 delete static_cast<Data::SetContainer*>(m_parameters);
00818 }
00819 };
00820
00824 boost::shared_ptr<SharedData> m_sharedData;
00833 unsigned char m_flags;
00834
00835 enum Flags { FLAG_ORIGINAL=1, FLAG_KEEPALIVE=1<<1 };
00836
00840 void callback()
00841 {
00842 boost::lock_guard<boost::mutex> lock(m_sharedData->m_callbackMutex);
00843 if(!m_sharedData->m_callback.empty())
00844 m_sharedData->m_callback();
00845 }
00846
00847
00848 template<class T> friend class ConnectionPar;
00849
00850 public:
00854 Query(): m_sharedData(new SharedData), m_flags(FLAG_ORIGINAL) { }
00860 Query(const Query& x): m_sharedData(x.m_sharedData), m_flags(0) {}
00864 ~Query()
00865 {
00866 if(m_flags == FLAG_ORIGINAL)
00867 cancel();
00868 }
00872 unsigned int insertId() const { return m_sharedData->m_insertId?*(m_sharedData->m_insertId):0; }
00879 unsigned int rows() const { return m_sharedData->m_rows?*(m_sharedData->m_rows):0; }
00883 bool busy() const { return m_sharedData.use_count() != 1; }
00889 Error error() const { return m_sharedData->m_error; }
00890
00896 void setCallback(boost::function<void()> callback=boost::function<void()>())
00897 {
00898 boost::lock_guard<boost::mutex> lock(m_sharedData->m_callbackMutex);
00899 m_sharedData->m_callback = callback;
00900 }
00904 bool isCallback() { return !m_sharedData->m_callback.empty(); }
00908 boost::function<void()> getCallback()
00909 {
00910 return m_sharedData->m_callback;
00911 }
00917 void keepAlive(bool x) { if(x) m_flags|=FLAG_KEEPALIVE; else m_flags&=~FLAG_KEEPALIVE; }
00924 void cancel() { m_sharedData->m_cancel = true; }
00928 void clearCancel() { m_sharedData->m_cancel = false; }
00932 void enableRows()
00933 {
00934 if(!m_sharedData->m_rows)
00935 m_sharedData->m_rows = new unsigned long long;
00936 }
00940 void enableInsertId()
00941 {
00942 if(!m_sharedData->m_insertId)
00943 m_sharedData->m_insertId = new unsigned long long;
00944 }
00945
00952 void setResults(Data::Set* results) { m_sharedData->destroyResults(); m_sharedData->m_results=results; m_sharedData->m_flags|=SharedData::FLAG_SINGLE_RESULTS; }
00959 void setResults(Data::SetContainer* results) { m_sharedData->destroyResults(); m_sharedData->m_results=results; m_sharedData->m_flags&=~SharedData::FLAG_SINGLE_RESULTS; }
00966 void setParameters(Data::Set* parameters) { m_sharedData->destroyParameters(); m_sharedData->m_parameters=parameters; m_sharedData->m_flags|=SharedData::FLAG_SINGLE_PARAMETERS; }
00973 void setParameters(Data::SetContainer* parameters) { m_sharedData->destroyParameters(); m_sharedData->m_parameters=parameters; m_sharedData->m_flags &= ~SharedData::FLAG_SINGLE_PARAMETERS; }
00982 void manageResults(bool manage) { if(manage) m_sharedData->m_flags&=~SharedData::FLAG_NO_MANAGE_RESULTS; else m_sharedData->m_flags|=SharedData::FLAG_NO_MANAGE_RESULTS; }
00991 void manageParameters(bool manage) { if(manage) m_sharedData->m_flags&=~SharedData::FLAG_NO_MANAGE_PARAMETERS; else m_sharedData->m_flags|=SharedData::FLAG_NO_MANAGE_PARAMETERS; }
00992
01001 template<class T> T* createResults() { setResults(new T); return static_cast<T*>(results()); }
01010 template<class T> T* createParameters() { setParameters(new T); return static_cast<T*>(parameters()); }
01011
01020 void relinquishResults() { m_sharedData->m_results=0; m_sharedData->m_flags &= ~SharedData::FLAG_SINGLE_RESULTS; }
01028 void relinquishParameters() { m_sharedData->m_parameters=0; m_sharedData->m_flags |= SharedData::FLAG_SINGLE_PARAMETERS; }
01029
01035 void* results() { return m_sharedData->m_results; }
01041 void* parameters() { return m_sharedData->m_parameters; }
01042
01046 void clearResults() { m_sharedData->destroyResults(); relinquishResults(); }
01050 void clearParameters() { m_sharedData->destroyParameters(); relinquishParameters(); }
01051
01058 void reset() { m_sharedData.reset(new SharedData); m_flags=FLAG_ORIGINAL; keepAlive(false); }
01059
01060
01061
01062
01063
01064
01065
01066
01067
01068 };
01069
01083 template<class T> class Transaction
01084 {
01085 public:
01089 struct Item
01090 {
01091 Item(Query query, T* statement): m_query(query), m_statement(statement) {}
01092 Item(const Item& x): m_query(x.m_query), m_statement(x.m_statement) {}
01093 Query m_query;
01094 T* m_statement;
01095 };
01096 private:
01097 std::vector<Item> m_items;
01098 public:
01099 typedef typename std::vector<Item>::iterator iterator;
01108 inline void push(Query& query, T& statement) { m_items.push_back(Item(query, &statement)); }
01112 inline void clear() { m_items.clear(); }
01116 inline iterator begin() { return m_items.begin(); }
01120 inline iterator end() { return m_items.end(); }
01124 inline bool empty() { return m_items.size()==0; }
01125
01129 void cancel();
01130
01134 void start() { m_items.front().m_statement->connection.queue(*this); }
01135 };
01136
01140 class Connection
01141 {
01142 public:
01143 int threads() const { return maxThreads; }
01144 protected:
01148 const int maxThreads;
01149 boost::mutex threadsMutex;
01150 boost::condition_variable threadsChanged;
01151 int m_threads;
01152
01153 virtual void commit(const unsigned int thread=0)=0;
01154 virtual void rollback(const unsigned int thread=0)=0;
01155
01156 boost::scoped_array<boost::condition_variable> wakeUp;
01157
01158 boost::mutex terminateMutex;
01159 bool terminateBool;
01160
01161 Connection(const int maxThreads_): maxThreads(maxThreads_), m_threads(0), wakeUp(new boost::condition_variable[maxThreads_]) {}
01162 };
01163
01167 template<class T> class ConnectionPar: public Connection
01168 {
01169 private:
01170 struct QuerySet
01171 {
01172 QuerySet(Query& query, T* const& statement, const bool commit): m_query(query), m_statement(statement), m_commit(commit) {}
01173 QuerySet() {}
01174 Query m_query;
01175 bool m_commit;
01176 T* m_statement;
01177 };
01181 class Queries: public std::queue<QuerySet>, public boost::mutex {};
01182 boost::scoped_array<Queries> queries;
01183
01187 void intHandler(const unsigned int id);
01188
01192 class SetCanceler
01193 {
01194 const bool*& m_canceler;
01195 public:
01196 SetCanceler(const bool*& canceler, bool& dest): m_canceler(canceler) { canceler=&dest; }
01197 ~SetCanceler() { m_canceler=s_false; }
01198 };
01199
01200 protected:
01201 ConnectionPar(const int maxThreads_): Connection(maxThreads_), queries(new Queries[maxThreads_]) {}
01202 public:
01206 void start();
01210 void terminate();
01214 void queue(Transaction<T>& transaction);
01215 inline void queue(T* const& statement, Query& query);
01216
01217 static const bool s_false;
01218 };
01219
01223 class Statement
01224 {
01225 protected:
01226 boost::scoped_array<Data::Conversions> paramsConversions;
01227 boost::scoped_array<Data::Conversions> resultsConversions;
01228
01229 Statement(unsigned int threads):
01230 paramsConversions(new Data::Conversions[threads]),
01231 resultsConversions(new Data::Conversions[threads]) {}
01232 };
01233 }
01234
01235 template<class T> void ASql::ConnectionPar<T>::start()
01236 {
01237 {
01238 boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
01239 terminateBool=false;
01240 }
01241
01242 boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
01243 while(m_threads<maxThreads)
01244 {
01245 boost::thread(boost::bind(&ConnectionPar<T>::intHandler, boost::ref(*this), m_threads));
01246 threadsChanged.wait(threadsLock);
01247 }
01248 }
01249
01250 template<class T> void ASql::ConnectionPar<T>::terminate()
01251 {
01252 {
01253 boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
01254 terminateBool=true;
01255 }
01256 for(boost::condition_variable* i=wakeUp.get(); i<wakeUp.get()+threads(); ++i)
01257 i->notify_all();
01258
01259 boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
01260 while(m_threads)
01261 threadsChanged.wait(threadsLock);
01262 }
01263
01264 template<class T> void ASql::ConnectionPar<T>::intHandler(const unsigned int id)
01265 {
01266 {
01267 boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
01268 ++m_threads;
01269 }
01270 threadsChanged.notify_one();
01271
01272 boost::unique_lock<boost::mutex> terminateLock(terminateMutex, boost::defer_lock_t());
01273 boost::unique_lock<boost::mutex> queriesLock(queries[id], boost::defer_lock_t());
01274
01275 while(1)
01276 {
01277 terminateLock.lock();
01278 if(terminateBool)
01279 break;
01280 terminateLock.unlock();
01281
01282 QuerySet querySet;
01283
01284 queriesLock.lock();
01285 if(!queries[id].size())
01286 {
01287 wakeUp[id].wait(queriesLock);
01288 queriesLock.unlock();
01289 continue;
01290 }
01291 querySet=queries[id].front();
01292 queries[id].pop();
01293 queriesLock.unlock();
01294
01295 Error error;
01296
01297 try
01298 {
01299 SetCanceler SetCanceler(querySet.m_statement->m_stop[id], querySet.m_query.m_sharedData->m_cancel);
01300 if(querySet.m_query.m_sharedData->m_flags & Query::SharedData::FLAG_SINGLE_PARAMETERS)
01301 {
01302 if(querySet.m_query.m_sharedData->m_flags & Query::SharedData::FLAG_SINGLE_RESULTS)
01303 {
01304 if(!querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), *static_cast<Data::Set*>(querySet.m_query.results()), false, id)) querySet.m_query.clearResults();
01305 }
01306 else
01307 querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), static_cast<Data::SetContainer*>(querySet.m_query.results()), querySet.m_query.m_sharedData->m_insertId, querySet.m_query.m_sharedData->m_rows, false, id);
01308 }
01309 else
01310 {
01311 querySet.m_statement->execute(*static_cast<const Data::SetContainer*>(querySet.m_query.parameters()), querySet.m_query.m_sharedData->m_rows, false, id);
01312 }
01313
01314 if(querySet.m_commit)
01315 commit(id);
01316 }
01317 catch(const Error& e)
01318 {
01319 querySet.m_query.m_sharedData->m_error=e;
01320
01321 rollback(id);
01322
01323 queriesLock.lock();
01324 QuerySet tmpQuerySet=querySet;
01325 while(!querySet.m_commit)
01326 {
01327 tmpQuerySet=queries[id].front();
01328 queries[id].pop();
01329 if(!querySet.m_query.isCallback() && tmpQuerySet.m_query.isCallback())
01330 querySet.m_query.setCallback(tmpQuerySet.m_query.getCallback());
01331
01332 }
01333 queriesLock.unlock();
01334 }
01335
01336 querySet.m_query.callback();
01337 }
01338
01339 {
01340 boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
01341 --m_threads;
01342 }
01343 threadsChanged.notify_one();
01344 }
01345
01346
01347 template<class T> void ASql::ConnectionPar<T>::queue(T* const& statement, Query& query)
01348 {
01349 unsigned int instance=0;
01350 for(unsigned int i=1; i<threads(); ++i)
01351 {{
01352 boost::lock_guard<boost::mutex> queriesLock(queries[i]);
01353 if(queries[i].size() < queries[instance].size())
01354 instance=i;
01355 }}
01356
01357 boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
01358 queries[instance].push(QuerySet(query, statement, true));
01359 wakeUp[instance].notify_one();
01360 }
01361
01362 template<class T> const bool ASql::ConnectionPar<T>::s_false = false;
01363
01364 template<class T> void ASql::ConnectionPar<T>::queue(Transaction<T>& transaction)
01365 {
01366 unsigned int instance=0;
01367 for(unsigned int i=1; i<threads(); ++i)
01368 {{
01369 boost::lock_guard<boost::mutex> queriesLock(queries[i]);
01370 if(queries[i].size() < queries[instance].size())
01371 instance=i;
01372 }}
01373
01374 boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
01375
01376 for(typename Transaction<T>::iterator it=transaction.begin(); it!=transaction.end(); ++it)
01377 queries[instance].push(QuerySet(it->m_query, it->m_statement, false));
01378 queries[instance].back().m_commit = true;
01379
01380 wakeUp[instance].notify_one();
01381 }
01382
01383 template<class T> void ASql::Transaction<T>::cancel()
01384 {
01385 for(iterator it=begin(); it!=end(); ++it)
01386 it->m_query.cancel();
01387 }
01388
01389 #endif