Main Page | Modules | Alphabetical List | Data Structures | File List | Data Fields | Globals

db_postgres.c

00001 #include <libpq-fe.h>
00002 #include <stdlib.h>
00003 #include <string.h>
00004 #include <stdarg.h>
00005 #ifdef CP_HAS_NETINET_IN_H
00006 #include <netinet/in.h>
00007 #endif /* CP_HAS_NETINET_IN_H */
00008 #ifdef _WINDOWS
00009 #include "socket.h"
00010 #endif /* _WINDOWS */
00011 #include <time.h>
00012 #include <stdio.h>
00013 
00014 #include "config.h"
00015 #include "common.h"
00016 #include "db.h"
00017 #include "str.h"
00018 #include "util.h"
00019 #include "hashtable.h"
00020 #include "vector.h"
00021 #include "log.h"
00022 
00023 #include "db_postgres.h"
00024 
00025 static cp_db_connection *pgsql_open_conn(cp_data_source *data_source);
00026 static cp_result_set *pgsql_select(cp_db_connection *_conn, char *query);
00027 static int pgsql_fetch_metadata(cp_result_set *result_set);
00028 static int pgsql_fetch_next(cp_result_set *result_set);
00029 static int pgsql_release_result_set(cp_result_set *result_set);
00030 static int pgsql_update(cp_db_connection *_conn, char *query);
00031 static int pgsql_close_conn(cp_db_connection *connection);
00032 static char *pgsql_escape_string(cp_db_connection *conn, char *str, int len);
00033 static char *pgsql_escape_binary(cp_db_connection *conn, char *str, 
00034                                  int len, int *res_len);
00035 static cp_string *pgsql_unescape_binary(cp_db_connection *conn, char *str);
00036 static cp_db_statement *
00037     pgsql_prepare_statement(cp_db_connection *conn, int prm_count, 
00038                             cp_field_type *prm_type, char *query);
00039 static int pgsql_execute_statement(cp_db_connection *conn, cp_db_statement *stmt, 
00040                                    cp_result_set **result_set, int *lengths, 
00041                                    void **prm);
00042 static void pgsql_release_statement(cp_db_statement *statement);
00043 static void pgsql_set_autocommit(cp_db_connection *conn, int state);
00044 static int pgsql_commit(cp_db_connection *conn);
00045 static int pgsql_rollback(cp_db_connection *conn);
00046 
00047 volatile static int initialized = 0;
00048 static cp_db_actions *pgsql_db_actions = NULL;
00049 static cp_hashtable *pgsql_field_type_map = NULL;
00050 
00051 #define DBNAME "postgres"
00052 
00053 typedef struct _field_type_map
00054 {
00055     int pgsql_field_code;
00056     int cprops_field_code;
00057 } field_type_map;
00058 
00059 field_type_map pgsql2cprops[] = 
00060 {
00061     {    16, CP_FIELD_TYPE_BOOLEAN },   // bool
00062     {    17, CP_FIELD_TYPE_BLOB },      // bytea
00063     {    18, CP_FIELD_TYPE_CHAR },      // char
00064     {    19, CP_FIELD_TYPE_VARCHAR },   // name
00065     {    20, CP_FIELD_TYPE_LONG_LONG }, // int8
00066     {    21, CP_FIELD_TYPE_SHORT },     // int2
00067 //  {    22, CP_FIELD_TYPE_ },          // int2vector
00068     {    23, CP_FIELD_TYPE_INT },       // int4
00069     {    25, CP_FIELD_TYPE_VARCHAR },   // text
00070     {    26, CP_FIELD_TYPE_INT },       // oid
00071     {   602, CP_FIELD_TYPE_VARCHAR },   // path
00072     {   700, CP_FIELD_TYPE_FLOAT },     // float4
00073     {   701, CP_FIELD_TYPE_DOUBLE },    // float8
00074     {   702, CP_FIELD_TYPE_DATE }, // abstime
00075     {   703, CP_FIELD_TYPE_TIME }, // reltime
00076     {   704, CP_FIELD_TYPE_TIME }, // tinterval
00077 //      {   790, CP_FIELD_TYPE_ }, // money
00078 //      {   791, CP_FIELD_TYPE_ }, // _money
00079     {   829, CP_FIELD_TYPE_VARCHAR }, // macaddr
00080     {   869, CP_FIELD_TYPE_VARCHAR }, // inet
00081 //      {   650, CP_FIELD_TYPE_ }, // cidr
00082     {  1000, CP_FIELD_TYPE_BOOLEAN }, // _bool
00083     {  1001, CP_FIELD_TYPE_VARCHAR }, // _bytea
00084     {  1002, CP_FIELD_TYPE_VARCHAR }, // _char
00085     {  1003, CP_FIELD_TYPE_VARCHAR }, // _name
00086     {  1005, CP_FIELD_TYPE_SHORT }, // _int2
00087 //  {  1006, CP_FIELD_TYPE_ }, // _int2vector
00088     {  1007, CP_FIELD_TYPE_INT }, // _int4
00089     {  1009, CP_FIELD_TYPE_VARCHAR }, // _text
00090     {  1014, CP_FIELD_TYPE_VARCHAR }, // _bpchar
00091     {  1015, CP_FIELD_TYPE_VARCHAR }, // _varchar
00092     {  1016, CP_FIELD_TYPE_LONG_LONG }, // _int8
00093     {  1021, CP_FIELD_TYPE_FLOAT }, // _float4
00094     {  1022, CP_FIELD_TYPE_DOUBLE }, // _float8
00095     {  1023, CP_FIELD_TYPE_DATE }, // _abstime
00096     {  1024, CP_FIELD_TYPE_TIME }, // _reltime
00097     {  1025, CP_FIELD_TYPE_TIME }, // _tinterval
00098     {  1033, CP_FIELD_TYPE_VARCHAR }, // aclitem
00099     {  1034, CP_FIELD_TYPE_VARCHAR }, // _aclitem
00100     {  1040, CP_FIELD_TYPE_VARCHAR }, // _macaddr
00101     {  1041, CP_FIELD_TYPE_VARCHAR }, // _inet
00102 //  {   651, CP_FIELD_TYPE_ }, // _cidr
00103     {  1042, CP_FIELD_TYPE_VARCHAR }, // bpchar
00104     {  1043, CP_FIELD_TYPE_VARCHAR }, // varchar
00105     {  1082, CP_FIELD_TYPE_DATE }, // date
00106     {  1083, CP_FIELD_TYPE_TIME }, // time
00107     {  1114, CP_FIELD_TYPE_TIMESTAMP }, // timestamp
00108     {  1115, CP_FIELD_TYPE_TIMESTAMP }, // _timestamp
00109     {  1182, CP_FIELD_TYPE_DATE }, // _date
00110     {  1183, CP_FIELD_TYPE_TIME }, // _time
00111     {  1184, CP_FIELD_TYPE_TIMESTAMP }, // timestamptz
00112     {  1185, CP_FIELD_TYPE_TIMESTAMP }, // _timestamptz
00113     {  1186, CP_FIELD_TYPE_TIME }, // interval
00114     {  1187, CP_FIELD_TYPE_TIME }, // _interval
00115     {  1231, CP_FIELD_TYPE_INT }, // _numeric
00116     {  1266, CP_FIELD_TYPE_TIME }, // timetz
00117     {  1270, CP_FIELD_TYPE_TIME }, // _timetz
00118     {  1560, CP_FIELD_TYPE_BOOLEAN }, // bit
00119     {  1561, CP_FIELD_TYPE_BOOLEAN }, // _bit
00120     {  1562, CP_FIELD_TYPE_LONG }, // varbit
00121     {  1563, CP_FIELD_TYPE_LONG }, // _varbit
00122     {  1700, CP_FIELD_TYPE_LONG }, // numeric
00123     {  2275, CP_FIELD_TYPE_VARCHAR }, // cstring
00124     {  2276, CP_FIELD_TYPE_VARCHAR }, // any
00125     { 10635, CP_FIELD_TYPE_LONG }, // cardinal_number
00126     { 10637, CP_FIELD_TYPE_VARCHAR } // character_data
00127 };
00128 
00129 void cp_pgsql_shutdown()
00130 {
00131     if (!initialized) return;
00132     initialized = 0;
00133 
00134     if (pgsql_db_actions)
00135     {
00136         cp_db_actions_destroy(pgsql_db_actions);
00137         pgsql_db_actions = NULL;
00138     }
00139 
00140     if (pgsql_field_type_map)
00141     {
00142         cp_hashtable_destroy(pgsql_field_type_map);
00143         pgsql_field_type_map = NULL;
00144     }
00145 }
00146 
00147 void cp_dbms_postgres_init()
00148 {
00149     int id;
00150     int i, types;
00151     if (initialized) return;
00152     initialized = 1;
00153 
00154 #ifdef _WINDOWS
00155     /* must initialize winsock or else */
00156     cp_socket_init();
00157 #endif /* _WINDOWS */
00158 
00159     id = cp_db_register_dbms(DBNAME, cp_pgsql_shutdown);
00160 
00161     pgsql_db_actions = 
00162         cp_db_actions_create(id, DBNAME, pgsql_open_conn, pgsql_select, 
00163                 pgsql_fetch_metadata, pgsql_fetch_next, 
00164                 pgsql_release_result_set, pgsql_update, pgsql_close_conn,
00165                 pgsql_escape_string, pgsql_escape_binary, 
00166                 pgsql_unescape_binary, pgsql_prepare_statement, 
00167                 pgsql_execute_statement, pgsql_release_statement,
00168                 pgsql_set_autocommit, pgsql_commit, pgsql_rollback);
00169 
00170     types = sizeof(pgsql2cprops) / sizeof(field_type_map);
00171     pgsql_field_type_map = 
00172         cp_hashtable_create_by_mode(COLLECTION_MODE_NOSYNC, types,
00173                                     cp_hash_int, cp_hash_compare_int);
00174     for (i = 0; i < types; i++)
00175         cp_hashtable_put(pgsql_field_type_map,
00176                          &pgsql2cprops[i].pgsql_field_code,
00177                          &pgsql2cprops[i].cprops_field_code);
00178 }
00179 
00180 static void 
00181     cp_pgsql_connection_parameters_delete(cp_pgsql_connection_parameters *prm)
00182 {
00183     if (prm)
00184     {
00185         if (prm->host) free(prm->host);
00186         if (prm->login) free(prm->login);
00187         if (prm->password) free(prm->password);
00188         if (prm->db_name) free(prm->db_name);
00189         if (prm->options) free(prm->options);
00190         if (prm->sslmode) free(prm->sslmode);
00191         if (prm->krbsrvname) free(prm->krbsrvname);
00192         if (prm->service) free(prm->service);
00193     }
00194 }
00195 
00196 cp_data_source *
00197     cp_postgres_data_source(char *host, 
00198                             int port, 
00199                             char *login, 
00200                             char *password, 
00201                             char *db_name,
00202                             char *options,
00203                             char *sslmode,
00204                             char *krbsrvname, 
00205                             char *service)
00206 {
00207     cp_data_source *data_source;
00208     cp_pgsql_connection_parameters *impl;
00209     
00210     if (!initialized) cp_dbms_postgres_init();
00211 
00212     data_source = cp_calloc(1, sizeof(cp_data_source));
00213     if (data_source == NULL) return NULL;
00214 
00215     data_source->act = pgsql_db_actions;
00216 
00217     data_source->prm = cp_calloc(1, sizeof(cp_db_connection_parameters));
00218     if (data_source->prm == NULL)
00219     {
00220         cp_free(data_source);
00221         return NULL;
00222     }
00223     impl = calloc(1, sizeof(cp_pgsql_connection_parameters));
00224     if (impl == NULL)
00225     {
00226         cp_free(data_source->prm);
00227         cp_free(data_source);
00228         return NULL;
00229     }
00230     //~~ check for alloc failures
00231     impl->host = host ? strdup(host) : NULL;
00232     impl->port = port;
00233     impl->login = login ? strdup(login) : NULL;
00234     impl->password = password ? strdup(password) : NULL;
00235     impl->db_name = db_name ? strdup(db_name) : NULL;
00236     impl->options = options ? strdup(options) : NULL;
00237     impl->sslmode = sslmode ? strdup(sslmode) : NULL;
00238     impl->krbsrvname = krbsrvname ? strdup(krbsrvname) : NULL;
00239     impl->service = service ? strdup(service) : NULL;
00240 
00241     data_source->prm->impl = impl;
00242     data_source->prm->impl_dtr = 
00243         (cp_destructor_fn) cp_pgsql_connection_parameters_delete;
00244 
00245     return data_source;
00246 }
00247 
00248 cp_data_source *
00249     cp_dbms_postgres_get_data_source(char *host, 
00250                                      int port, 
00251                                      char *login, 
00252                                      char *password,
00253                                      char *dbname)
00254 {
00255     return cp_postgres_data_source(host, port, login, password, dbname, 
00256                                    NULL, NULL, NULL, NULL);
00257 }
00258 
00259 cp_data_source *
00260     cp_dbms_postgres_get_data_source_prm(char *host,
00261                                          int port, 
00262                                          char *login, 
00263                                          char *password, 
00264                                          char *dbname, 
00265                                          cp_hashtable *prm)
00266 {
00267     return cp_postgres_data_source(host, port, login, password, dbname, 
00268                                    cp_hashtable_get(prm, "options"),
00269                                    cp_hashtable_get(prm, "sslmode"),
00270                                    cp_hashtable_get(prm, "krbsrvname"),
00271                                    cp_hashtable_get(prm, "service"));
00272 }
00273 
00274 static int pgsql_close_conn(cp_db_connection *connection)
00275 {
00276     void *conn = connection->connection;
00277     if (conn == NULL) return -1;
00278     PQfinish(conn); 
00279     return 0;
00280 }
00281 
00282 void cs_append(cp_string *cs, char *title, char *value)
00283 {
00284     cp_string_cstrcat(cs, " ");
00285     cp_string_cstrcat(cs, title);
00286     cp_string_cstrcat(cs, "=");
00287     cp_string_cstrcat(cs, value);
00288 }
00289 
00290 static cp_db_connection *pgsql_open_conn(cp_data_source *data_source)
00291 {
00292     PGconn *conn = NULL;
00293     cp_string *cs = cp_string_create("", 0);
00294     cp_pgsql_connection_parameters *conf = data_source->prm->impl;
00295     cp_db_connection *res;
00296 
00297     if (conf->host) cs_append(cs, "host", conf->host);
00298     if (conf->port) 
00299     {
00300         char buf[32];
00301 #ifdef CP_HAS_SNPRINTF
00302         snprintf(buf, 32, " port=%d", conf->port);
00303 #else
00304         sprintf(buf, " port=%d", conf->port);
00305 #endif /* CP_HAS_SNPRINTF */
00306         cp_string_cstrcat(cs, buf);
00307     }
00308     if (conf->login) cs_append(cs, "user", conf->login);
00309     if (conf->password) cs_append(cs, "password", conf->password);
00310     if (conf->db_name) cs_append(cs, "dbname", conf->db_name);
00311     if (conf->options) cs_append(cs, "options", conf->options);
00312     if (conf->sslmode) cs_append(cs, "sslmode", conf->sslmode);
00313     if (conf->krbsrvname) cs_append(cs, "krbsrvname", conf->krbsrvname);
00314     if (conf->service) cs_append(cs, "service", conf->service);
00315 
00316     conn = PQconnectdb(cp_string_tocstr(cs));
00317 
00318     cp_string_delete(cs);
00319 
00320     if (conn == NULL) return NULL;
00321 
00322     if (PQstatus(conn) == CONNECTION_BAD) 
00323     {
00324         cp_error(CP_DBMS_CONNECTION_FAILURE, 
00325                  "PGSQL: connection to database %s failed", conf->db_name); 
00326         cp_error(CP_DBMS_CONNECTION_FAILURE, 
00327                  "PGSQL: %s", PQerrorMessage(conn));
00328         goto failed;
00329     }
00330 
00331 #ifdef __TRACE__
00332     cp_info("PGSQL: Connected to server at %s", conf->host);
00333 #endif
00334 
00335     /* good. now set up connection wrapper */
00336     res = cp_calloc(1, sizeof(cp_db_connection));
00337     res->data_source = data_source;
00338     res->connection = conn;
00339     res->read_result_set_at_once = 1;
00340     res->autocommit = 1;
00341     return res;
00342 
00343 failed:
00344     PQfinish(conn); 
00345     return NULL;
00346 }
00347 
00348 static int pgsql_result_error(PGresult *res)
00349 {
00350     switch(PQresultStatus(res)) 
00351     {
00352         case PGRES_EMPTY_QUERY:
00353         case PGRES_BAD_RESPONSE:
00354         case PGRES_NONFATAL_ERROR:
00355         case PGRES_FATAL_ERROR:
00356             cp_error(1, "PGSQL: %s", PQresultErrorMessage(res));
00357             PQclear(res);
00358             return 1;
00359     
00360         default: 
00361             break;
00362     }
00363 
00364     return 0;
00365 }
00366 
00367 static cp_result_set *
00368     create_result_set(cp_db_connection *_conn, PGresult *res)
00369 {
00370     cp_result_set *result_set = cp_calloc(1, sizeof(cp_result_set));
00371     result_set->field_count = PQnfields(res);
00372     result_set->row_count = PQntuples(res);
00373     result_set->results = cp_list_create();
00374     result_set->connection = _conn;
00375     result_set->source = res;
00376 
00377 //  always fetch meta data - column types needed to decide whether to unescape 
00378 //  BYTEA columns
00379     pgsql_fetch_metadata(result_set);
00380 
00381     return result_set;
00382 }
00383 
00384 cp_string *pgsql_unescape_binary(cp_db_connection *conn, char *str)
00385 {
00386     int newlen;
00387     char *unescaped = PQunescapeBytea(str, &newlen);
00388     cp_string *res = cp_string_create(unescaped, newlen);
00389     PQfreemem(unescaped);
00390     return res;
00391 }
00392     
00393 static int fetch_rows(cp_result_set *result_set, PGresult *res)
00394 {
00395     int i, j;
00396     cp_list *list = result_set->results;
00397     cp_vector *rec;
00398     cp_field_type *type; 
00399 
00400     for (i = 0; i < result_set->row_count; i++)
00401     {
00402         /* check for chunked reading */
00403         if (!result_set->connection->read_result_set_at_once 
00404                 && result_set->connection->fetch_size != 0 
00405                 && i == result_set->connection->fetch_size) break;
00406 
00407         if (result_set->binary)
00408             rec = cp_vector_create(result_set->field_count);
00409         else
00410             rec = cp_vector_create_by_option(result_set->field_count, 
00411                 COLLECTION_MODE_DEEP, 
00412                 NULL, (cp_destructor_fn) cp_string_destroy);
00413 
00414         for (j = 0; j < result_set->field_count; j++)
00415         {
00416             if (!PQgetisnull(res, i, j))
00417             {
00418                 type = cp_vector_element_at(result_set->field_types, j);
00419                 if (result_set->binary)
00420                 {
00421                     switch (*type)
00422                     {
00423                         case CP_FIELD_TYPE_BOOLEAN:
00424                         {
00425                             char *b = calloc(1, sizeof(short));
00426                             short *p = (short *) b;
00427                             memcpy(b, PQgetvalue(res, i, j), 1);
00428                             *p = (short) *b;
00429                             cp_vector_set_element(rec, j, p);
00430                         }
00431                         break;
00432 
00433                         case CP_FIELD_TYPE_SHORT:
00434                         {
00435                             short *p = malloc(sizeof(short));
00436                             memcpy(p, PQgetvalue(res, i, j), sizeof(short));
00437 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00438                             *p = ntohs(*p);
00439 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00440                             cp_vector_set_element(rec, j, p);
00441                         }
00442                         break;
00443 
00444                         case CP_FIELD_TYPE_INT:
00445                         {
00446                             int *p = malloc(sizeof(int));
00447                             memcpy(p, PQgetvalue(res, i, j), sizeof(int));
00448 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00449                             *p = ntohl(*p);
00450 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00451                             cp_vector_set_element(rec, j, p);
00452                         }
00453                         break;
00454 
00455                         case CP_FIELD_TYPE_LONG:
00456                         {
00457                             long *p = malloc(sizeof(long));
00458                             memcpy(p, PQgetvalue(res, i, j), sizeof(long));
00459 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00460                             if (sizeof(long) == sizeof(int))
00461                                 *p = ntohl(*p);
00462 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00463                             cp_vector_set_element(rec, j, p);
00464                         }
00465                         break;
00466 
00467                         case CP_FIELD_TYPE_FLOAT:
00468                         {
00469                             float *p = calloc(1, sizeof(float));
00470 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00471                             int *q = (int *) p;
00472 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00473                             memcpy(p, PQgetvalue(res, i, j), sizeof(float));
00474 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00475                             *q = ntohl(*q);
00476 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00477                             cp_vector_set_element(rec, j, p);
00478                         }
00479                         break;
00480 
00481                         case CP_FIELD_TYPE_DOUBLE:
00482                         {
00483                             double *p = malloc(sizeof(double));
00484 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00485                             int *q = (int *) p;
00486                             int w;
00487 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00488                             memcpy(p, PQgetvalue(res, i, j), sizeof(double));
00489 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN 
00490                             /* believe it or not */
00491                             q[0] = ntohl(q[0]);
00492                             q[1] = ntohl(q[1]);
00493                             w = q[0];
00494                             q[0] = q[1];
00495                             q[1] = w;
00496 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00497                             cp_vector_set_element(rec, j, p);
00498                         }
00499                         break;
00500 
00501                         case CP_FIELD_TYPE_TIME:
00502                         case CP_FIELD_TYPE_DATE:
00503                         case CP_FIELD_TYPE_TIMESTAMP:
00504                         {
00505                             union 
00506                             {
00507                                 double p;
00508                                 int q[2];
00509                             } up;
00510 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN 
00511                             int w;
00512 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00513                             cp_timestampz *t = calloc(1, sizeof(cp_timestampz));
00514                             memcpy(&up.p, PQgetvalue(res, i, j), sizeof(double));
00515 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN 
00516                             up.q[0] = ntohl(up.q[0]);
00517                             up.q[1] = ntohl(up.q[1]);
00518                             w = up.q[0];
00519                             up.q[0] = up.q[1];
00520                             up.q[1] = w;
00521 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00522                             up.p += (30 * 365 + 7) * 24 * 3600;
00523                             t->tm.tv_sec = up.p;
00524                             t->tm.tv_usec = (up.p - t->tm.tv_sec) * 1000000;
00525                             cp_vector_set_element(rec, j, t);
00526                         }
00527                         break;
00528                         
00529                         default:
00530                             cp_vector_set_element(rec, j, 
00531                                 cp_string_create(PQgetvalue(res, i, j), 
00532                                                  PQgetlength(res, i, j)));
00533                             break;
00534                     }
00535                 }
00536                 else if (*type == CP_FIELD_TYPE_BLOB)
00537                     cp_vector_set_element(rec, j, 
00538                         pgsql_unescape_binary(NULL, PQgetvalue(res, i, j)));
00539                 else
00540                     cp_vector_set_element(rec, j, 
00541                             cp_string_create(PQgetvalue(res, i, j), 
00542                                              PQgetlength(res, i, j)));
00543             }
00544         }
00545 
00546         cp_list_append(list, rec);
00547     }
00548 
00549     if (i == result_set->row_count) 
00550     {
00551         result_set->fetch_complete = 1;
00552         PQclear(res);
00553         result_set->source = NULL;
00554     }
00555             
00556     return i;
00557 }
00558 
00559 static cp_result_set *pgsql_select(cp_db_connection *_conn, char *query)
00560 {
00561     int rc;
00562     PGresult *res = NULL;
00563     PGconn *conn = (PGconn *) _conn->connection;
00564     cp_result_set *result_set = NULL;
00565 
00566     rc = PQsendQuery(conn, query);
00567     while (PQflush(conn) == 1);
00568     if (rc == 0) 
00569     {
00570         cp_error(CP_DBMS_QUERY_FAILED, "%s", query);
00571         return NULL;
00572     }
00573 
00574     if ((res = PQgetResult(conn)) != NULL)
00575     {
00576         result_set = create_result_set(_conn, res);
00577         do 
00578         {
00579             if (pgsql_result_error(res)) return NULL;
00580             fetch_rows(result_set, res);
00581             /* if doing chunked reading, it's time to break */
00582             if (!_conn->read_result_set_at_once) break;
00583         }
00584         while ((res = PQgetResult(conn)) != NULL);
00585     }
00586     else while (PQflush(conn) == 1);
00587 
00588     return result_set;
00589 }
00590 
00591 static int convert_field_type(int field_type)
00592 {
00593     int *type = cp_hashtable_get(pgsql_field_type_map, &field_type);
00594     return type != NULL ? *type : field_type;
00595 }
00596 
00597 static int pgsql_fetch_metadata(cp_result_set *result_set)
00598 {
00599     int rc = -1;
00600     int j;
00601 
00602     if (result_set->field_headers == NULL)
00603     {
00604         PGresult *res = result_set->source;
00605         result_set->field_headers = 
00606             cp_vector_create_by_option(result_set->field_count, 
00607                                        COLLECTION_MODE_NOSYNC | 
00608                                        COLLECTION_MODE_COPY | 
00609                                        COLLECTION_MODE_DEEP |
00610                                        COLLECTION_MODE_MULTIPLE_VALUES, 
00611                                        (cp_copy_fn) strdup, 
00612                                        (cp_destructor_fn) free);
00613         result_set->field_types = 
00614             cp_vector_create_by_option(result_set->field_count, 
00615                                        COLLECTION_MODE_NOSYNC | 
00616                                        COLLECTION_MODE_COPY | 
00617                                        COLLECTION_MODE_DEEP |
00618                                        COLLECTION_MODE_MULTIPLE_VALUES,
00619                                        (cp_copy_fn) intdup, 
00620                                        (cp_destructor_fn) free);
00621         for (j = 0; j < result_set->field_count; j++)
00622         {
00623             int type = convert_field_type(PQftype(result_set->source, j));
00624             cp_vector_set_element(result_set->field_headers, j, 
00625                                   PQfname(res, j));
00626             cp_vector_set_element(result_set->field_types, j, &type);
00627         }
00628         rc = 0;
00629     }
00630 
00631     return rc;
00632 }
00633 
00634 static int pgsql_fetch_next(cp_result_set *result_set)
00635 {
00636     int i, j;
00637     PGresult *res = result_set->source;
00638     PGconn *conn = (PGconn *) result_set->connection->connection;
00639     cp_vector *rec;
00640 
00641     while (res)
00642     {
00643         if (pgsql_result_error(res)) return -1;
00644 
00645         for (i = result_set->position; i < result_set->row_count; i++)
00646         {
00647             if (((i + 1) % result_set->connection->fetch_size) == 0) break;
00648 
00649             rec = cp_vector_create_by_option(result_set->field_count, 
00650                     COLLECTION_MODE_DEEP, 
00651                     NULL, (cp_destructor_fn) cp_string_destroy);
00652 
00653             for (j = 0; j < result_set->field_count; j++)
00654             {
00655                 if (!PQgetisnull(res, i, j))
00656                     cp_vector_set_element(rec, j, 
00657                         cp_string_create(PQgetvalue(res, i, j), 
00658                                          PQgetlength(res, i, j)));
00659             }
00660 
00661             cp_list_append(result_set->results, rec);
00662         }
00663 
00664         if (i == result_set->row_count) 
00665         {
00666             result_set->fetch_complete = 1;
00667             PQclear(res);
00668             result_set->source = NULL;
00669         }
00670         else /* read chunk, return */
00671             break;
00672 
00673         res = PQgetResult(conn);
00674     }
00675 
00676     return 0;
00677 }
00678 
00679 static int pgsql_release_result_set(cp_result_set *result_set)
00680 {
00681     if (result_set->source)
00682     {
00683         PGresult *res = result_set->source;
00684         PGconn *conn = (PGconn *) result_set->connection->connection;
00685         do
00686         {
00687             PQclear(res);
00688         } while ((res = PQgetResult(conn)) != NULL);
00689     }
00690 
00691     return 0;
00692 }
00693 
00694 static int pgsql_update(cp_db_connection *_conn, char *query)
00695 {
00696     int rows = 0;
00697     PGresult *res = NULL;
00698     PGconn *conn = (PGconn*) _conn->connection;
00699     int rc;
00700 
00701     rc = PQsendQuery(conn, query);
00702     while (PQflush(conn) == 1);
00703     if (rc == 0) 
00704     {
00705         cp_error(CP_DBMS_QUERY_FAILED, "%s", query);
00706         return -1;
00707     }
00708 
00709     while ((res = PQgetResult(conn)) != NULL)
00710     {
00711         switch (PQresultStatus(res)) 
00712         {
00713             case PGRES_BAD_RESPONSE:
00714             case PGRES_NONFATAL_ERROR:
00715             case PGRES_FATAL_ERROR:
00716                 cp_error(CP_DBMS_QUERY_FAILED, "postgres: %s", query);
00717                 cp_error(CP_DBMS_QUERY_FAILED, 
00718                          "postgres: %s", PQresultErrorMessage(res));
00719                 PQclear(res);
00720                 return -1;
00721 
00722             default:
00723                 rows += atoi(PQcmdTuples(res));
00724                 break;
00725         }
00726         PQclear(res);
00727     }
00728 
00729     return rows;
00730 }
00731 
00732 static char *pgsql_escape_string(cp_db_connection *conn, char *src, int len)
00733 {
00734     char *res = malloc(len * 2 + 1); /* postgres spec */
00735     PQescapeString(res, src, len);
00736     return res;
00737 }
00738 
00739 static char *pgsql_escape_binary(cp_db_connection *conn, char *src, 
00740                                  int len, int *res_len)
00741 {
00742     return PQescapeBytea(src, len, res_len);
00743 }
00744 
00745 //~~ magic numbers are postgres OIDs - maybe include 
00746 //~~ postgresql/server/catalog/pg_type.h instead
00747 static Oid cprops2pgsql(cp_field_type type)
00748 {
00749     switch(type)
00750     {
00751         case CP_FIELD_TYPE_BOOLEAN: return 16;
00752         case CP_FIELD_TYPE_CHAR: return 1043; // 18
00753         case CP_FIELD_TYPE_SHORT:return 21;
00754         case CP_FIELD_TYPE_INT: return 23;
00755         case CP_FIELD_TYPE_LONG: return sizeof(long) == sizeof(int) ? 23 : 20;
00756         case CP_FIELD_TYPE_LONG_LONG: return 20;
00757         case CP_FIELD_TYPE_FLOAT: return 700;
00758         case CP_FIELD_TYPE_DOUBLE: return 701;
00759         case CP_FIELD_TYPE_VARCHAR: return 1043;
00760         case CP_FIELD_TYPE_BLOB: return 17;
00761         case CP_FIELD_TYPE_DATE: return 1114; // 1082
00762         case CP_FIELD_TYPE_TIME: return 1083;
00763         case CP_FIELD_TYPE_TIMESTAMP: return 1114;
00764         default: return 2276;
00765     }
00766 
00767     return 2276;
00768 }
00769 
00770 char *format_prm(char *query, int prm_count)
00771 {
00772     char *subst = query;
00773     char marker[5]; /* no more than 999 parameters */
00774     int count = 1;
00775     
00776     if (prm_count)
00777     {
00778         cp_string *buf = NULL;
00779         char *curr = query;
00780         char *prev = NULL;
00781         while  ((curr = strchr(curr, '?')) != NULL)
00782         {
00783             if (buf == NULL) 
00784             {
00785                 buf = cp_string_create("", 0);
00786                 prev = query;
00787             }
00788 
00789             cp_string_cat_bin(buf, prev, curr - prev);
00790 #ifdef CP_HAS_SNPRINTF
00791             snprintf(marker, 5, "$%d", count);
00792 #else
00793             sprintf(marker, "$%d", count);
00794 #endif /* CP_HAS_SNPRINTF */
00795             count++;
00796             cp_string_cstrcat(buf, marker);
00797             prev = ++curr;
00798         }
00799         if (prev) cp_string_cstrcat(buf, prev);
00800         if (buf)
00801         {
00802             subst = buf->data;
00803             free(buf);
00804         }
00805     }
00806 
00807     return subst;
00808 }
00809 
00810 static cp_db_statement *pgsql_prepare_statement(cp_db_connection *conn, 
00811                                                 int prm_count, 
00812                                                 cp_field_type *prm_type, 
00813                                                 char *query)
00814 {
00815     cp_db_statement *statement = NULL;
00816     int i;
00817     PGresult *res;
00818     Oid *paramTypes = malloc(prm_count * sizeof(Oid));
00819     char *stmt = malloc(21);
00820     char *subst;
00821     ExecStatusType rc;
00822 
00823 #ifdef CP_HAS_STRLCPY
00824     strlcpy(stmt, "STMT", 21);
00825 #else
00826     strcpy(stmt, "STMT");
00827 #endif /* CP_HAS_STRLCPY */
00828     gen_id_str(&stmt[4]);
00829 
00830     for (i = 0; i < prm_count; i++)
00831         paramTypes[i] = cprops2pgsql(prm_type[i]);
00832     
00833     /* parameters may be given as ?, postgres expects $1, $2 etc */
00834     subst = format_prm(query, prm_count);
00835     res = PQprepare(conn->connection, stmt, subst, prm_count, paramTypes);
00836     free(paramTypes);
00837     if (subst != query) free(subst);
00838 
00839     if (res == NULL)
00840     {
00841         cp_error(CP_DBMS_CLIENT_ERROR, "%s - prepare statement: %s", 
00842                  conn->data_source->act->dbms_lit, 
00843                  PQerrorMessage(conn->connection));
00844         free(stmt);
00845     }
00846     else
00847     {
00848         rc = PQresultStatus(res);
00849         switch (rc)
00850         {
00851             case PGRES_EMPTY_QUERY:
00852             case PGRES_BAD_RESPONSE:
00853             case PGRES_FATAL_ERROR:
00854                 free(stmt);
00855                 stmt = NULL;
00856                 cp_error(CP_DBMS_CLIENT_ERROR, "%s - prepare statement: %s", 
00857                          conn->data_source->act->dbms_lit, 
00858                          PQresultErrorMessage(res));
00859                 break;
00860 
00861             case PGRES_NONFATAL_ERROR:
00862                 cp_warn("%s - prepare statement: %s", 
00863                         conn->data_source->act->dbms_lit, 
00864                         PQresultErrorMessage(res));
00865             default:
00866                 statement = cp_db_statement_instantiate(conn, prm_count, 
00867                                                         prm_type, stmt);
00868                 if (statement == NULL)
00869                 {
00870                     //~~ report error
00871                     free(stmt);
00872                 }
00873                 break;
00874             
00875         }
00876         PQclear(res);
00877     }
00878 
00879     if (statement)
00880         statement->store = memdup(prm_type, prm_count * sizeof(cp_field_type));
00881 
00882     return statement;
00883 }
00884 
00885 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00886 
00887 void hton8byte(void *ptr)
00888 {
00889     int w;
00890     union
00891     {
00892         double d;
00893         int i[2];
00894     } *u;
00895 
00896     u = ptr;
00897     w = ntohl(u->i[0]);
00898     u->i[0] = ntohl(u->i[1]);
00899     u->i[1] = w;
00900 }
00901     
00902     
00903 static void **to_network_order(cp_field_type *types, void **prm, int count)
00904 {
00905     int i;
00906     void **res = calloc(count, sizeof(void *));
00907     short *pshort;
00908     int *pint;
00909     long *plong;
00910     float *pfloat;
00911     double *pdouble;
00912     
00913     for (i = 0; i < count; i++)
00914     {
00915         switch (types[i])
00916         {
00917             case CP_FIELD_TYPE_SHORT:
00918                 pshort = malloc(sizeof(short));
00919                 memcpy(pshort, prm[i], sizeof(short));
00920                 *pshort = htons(*pshort);
00921                 res[i] = pshort;
00922                 break;
00923 
00924             case CP_FIELD_TYPE_INT:
00925                 pint = malloc(sizeof(int));
00926                 memcpy(pint, prm[i], sizeof(int));
00927                 *pint = htonl(*pint);
00928                 res[i] = pint;
00929                 break;
00930 
00931             case CP_FIELD_TYPE_LONG:
00932                 if (sizeof(long) == sizeof(int))
00933                 {
00934                     plong = malloc(sizeof(long));
00935                     memcpy(plong, prm[i], sizeof(long));
00936                     *plong = htonl(*plong);
00937                     res[i] = plong;
00938                     break;
00939                 }
00940 
00941             case CP_FIELD_TYPE_FLOAT:
00942                 pfloat = malloc(sizeof(float));
00943                 memcpy(pfloat, prm[i], sizeof(float));
00944                 plong = (long *) pfloat;
00945                 *plong = htonl(*plong);
00946                 res[i] = pfloat;
00947                 break;
00948 
00949             case CP_FIELD_TYPE_DOUBLE:
00950                 pdouble = malloc(sizeof(double));
00951                 memcpy(pdouble, prm[i], sizeof(double));
00952                 hton8byte(pdouble);
00953                 res[i] = pdouble;
00954                 break;
00955 
00956             case CP_FIELD_TYPE_TIME:
00957             case CP_FIELD_TYPE_DATE:
00958             case CP_FIELD_TYPE_TIMESTAMP:
00959                 res[i] = prm[i];
00960                 hton8byte(res[i]);
00961                 break;
00962 
00963             default:
00964                 res[i] = prm[i];
00965         }
00966     }
00967 
00968     return res;
00969 }
00970 
00971 void delete_prm(void **in_order_prm, cp_field_type *types, int count)
00972 {
00973     int i;
00974 
00975     for (i = 0; i < count; i++)
00976     {
00977         switch (types[i])
00978         {
00979             case CP_FIELD_TYPE_SHORT:
00980             case CP_FIELD_TYPE_INT:
00981             case CP_FIELD_TYPE_FLOAT:
00982             case CP_FIELD_TYPE_DOUBLE:
00983                 free(in_order_prm[i]);
00984                 break;
00985 
00986             case CP_FIELD_TYPE_LONG:
00987                 if (sizeof(long) == sizeof(int))
00988                 {
00989                     free(in_order_prm[i]);
00990                     break;
00991                 }
00992 
00993             default:
00994                 break;
00995         }
00996     }
00997 
00998     free(in_order_prm);
00999 }
01000 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01001 
01002 static void *to_pgtime(cp_timestampz *tmz)
01003 {
01004     double *d = calloc(1, sizeof(double));
01005     *d = tmz->tm.tv_sec + ((double) tmz->tm.tv_usec) / 1e6
01006          + tmz->tz_minuteswest * 60 - ((30 * 365 + 7) * 86400);
01007     return d;
01008 }
01009 
01010 static void subst_cp2pgsql(int count, cp_field_type *types, void **prm, int *lengths)
01011 {
01012     int i;
01013     short *s;
01014     char *ch;
01015     
01016     for (i = 0; i < count; i++)
01017     {
01018         switch (types[i])
01019         {
01020             case CP_FIELD_TYPE_BOOLEAN:
01021                 s = prm[i];
01022                 ch = prm[i];
01023                 *ch = (prm[i] == 0 ? 'f' : 't');
01024                 lengths[i] = 1;
01025                 break;
01026 
01027             case CP_FIELD_TYPE_TIME:
01028             case CP_FIELD_TYPE_DATE:
01029             case CP_FIELD_TYPE_TIMESTAMP:
01030                 prm[i] = to_pgtime(prm[i]);
01031                 lengths[i] = sizeof(double);
01032                 break;
01033 
01034             default:
01035                 break;
01036         }
01037     }
01038 }
01039 
01040 static void free_cp2pgsql(int count, cp_field_type *types, void **prm)
01041 {
01042     int i;
01043     
01044     for (i = 0; i < count; i++)
01045     {
01046         if (types[i] == CP_FIELD_TYPE_TIME ||
01047             types[i] == CP_FIELD_TYPE_DATE ||
01048             types[i] == CP_FIELD_TYPE_TIMESTAMP)
01049         {
01050             free(prm[i]);
01051         }
01052     }
01053 }
01054 
01055 static int pgsql_execute_statement(cp_db_connection *conn, 
01056                                    cp_db_statement *stmt, 
01057                                    cp_result_set **result_set, 
01058                                    int *lengths, 
01059                                    void **prm)
01060 {
01061     int *fmt = NULL;
01062     cp_result_set *res = NULL;
01063     PGresult *pgres = NULL;
01064     ExecStatusType ret;
01065     int i;
01066     int rc = 0;
01067     cp_field_type *types = NULL;
01068 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01069     void **in_order_prm = prm;
01070 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01071 
01072     subst_cp2pgsql(stmt->prm_count, stmt->store, prm, lengths);
01073 
01074     if (lengths)
01075     {
01076         types = stmt->store;
01077         fmt = calloc(stmt->prm_count, sizeof(int));
01078         for (i = 0; i < stmt->prm_count; i++) fmt[i] = 1;
01079 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01080         in_order_prm = to_network_order(types, prm, stmt->prm_count);
01081 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01082     }
01083 
01084     pgres = PQexecPrepared(conn->connection, (char *) stmt->source,
01085                            stmt->prm_count,
01086 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01087                            (const char * const *) in_order_prm,
01088 #else
01089                            (const char * const *) prm,
01090 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */                           
01091                            lengths, fmt, stmt->binary);
01092     free_cp2pgsql(stmt->prm_count, stmt->store, prm);
01093     if (fmt) free(fmt);
01094 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01095     if (in_order_prm != prm) delete_prm(in_order_prm, types, stmt->prm_count);
01096 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01097     
01098     ret = PQresultStatus(pgres);
01099     switch (ret)
01100     {
01101         case PGRES_TUPLES_OK:
01102             res = create_result_set(conn, pgres);
01103             res->binary = stmt->binary;
01104             fetch_rows(res, pgres);
01105             break;
01106 
01107         case PGRES_COMMAND_OK:
01108             rc = atoi(PQcmdTuples(pgres));
01109             PQclear(pgres);
01110             break;
01111 
01112         case PGRES_EMPTY_QUERY:
01113         case PGRES_BAD_RESPONSE:
01114         case PGRES_FATAL_ERROR:
01115         case PGRES_NONFATAL_ERROR:
01116         default:
01117             rc = -1;
01118             PQclear(pgres);
01119             cp_error(CP_DBMS_QUERY_FAILED, "%s - prepared statement: %s", 
01120                      conn->data_source->act->dbms_lit, 
01121                      PQresultErrorMessage(pgres));
01122             break;
01123     }
01124 
01125     if (result_set)
01126         *result_set = res;
01127 
01128     return rc;
01129 }
01130 
01131 static void pgsql_release_statement(cp_db_statement *statement)
01132 {
01133     if (statement->source) free(statement->source);
01134     if (statement->store) free(statement->store);
01135 }
01136 
01137 static void pgsql_set_autocommit(cp_db_connection *conn, int state)
01138 {
01139     if (state == 0)
01140         pgsql_update(conn, "begin;");
01141 }
01142 
01143 static int pgsql_commit(cp_db_connection *conn)
01144 {
01145     int rc = pgsql_update(conn, "commit;");
01146     if (conn->autocommit == 0)
01147         pgsql_update(conn, "begin;");
01148     return rc;
01149 }
01150 
01151 static int pgsql_rollback(cp_db_connection *conn)
01152 {
01153     int rc = pgsql_update(conn, "rollback;");
01154     if (conn->autocommit == 0)
01155         pgsql_update(conn, "begin;");
01156     return rc;
01157 }
01158 
01159 #ifdef WINDOWS
01160 BOOL APIENTRY DllMain( HANDLE hModule, 
01161                        DWORD  ul_reason_for_call, 
01162                        LPVOID lpReserved
01163                      )
01164 {
01165     switch (ul_reason_for_call)
01166     {
01167         case DLL_PROCESS_ATTACH:
01168         case DLL_THREAD_ATTACH:
01169         case DLL_THREAD_DETACH:
01170         case DLL_PROCESS_DETACH:
01171             break;
01172     }
01173     return TRUE;
01174 }
01175 #endif /* _WINDOWS */

Generated on Sat Dec 1 10:25:29 2007 for cprops by  doxygen 1.3.9.1