db_postgres.c

00001 #include <libpq-fe.h>
00002 #include <stdlib.h>
00003 #include <string.h>
00004 #include <stdarg.h>
00005 
00006 #include "config.h"     // must come before using CP_HAS_xxx
00007 #ifdef CP_HAS_ARPA_INET_H
00008 #include <arpa/inet.h>
00009 #endif /* CP_HAS_ARPA_INET_H */
00010 #ifdef CP_HAS_NETINET_IN_H
00011 #include <netinet/in.h>
00012 #endif /* CP_HAS_NETINET_IN_H */
00013 #ifdef _WINDOWS
00014 #include "socket.h"
00015 #endif /* _WINDOWS */
00016 #include <time.h>
00017 #include <stdio.h>
00018 
00019 #include "common.h"
00020 #include "db.h"
00021 #include "str.h"
00022 #include "util.h"
00023 #include "hashtable.h"
00024 #include "vector.h"
00025 #include "log.h"
00026 
00027 #include "db_postgres.h"
00028 
00029 static cp_db_connection *pgsql_open_conn(cp_data_source *data_source);
00030 static cp_result_set *pgsql_select(cp_db_connection *_conn, char *query);
00031 static int pgsql_fetch_metadata(cp_result_set *result_set);
00032 static int pgsql_fetch_next(cp_result_set *result_set);
00033 static int pgsql_release_result_set(cp_result_set *result_set);
00034 static int pgsql_update(cp_db_connection *_conn, char *query);
00035 static int pgsql_close_conn(cp_db_connection *connection);
00036 static char *pgsql_escape_string(cp_db_connection *conn, char *str, size_t len);
00037 static char *pgsql_escape_binary(cp_db_connection *conn, char *str, 
00038                                  size_t len, size_t *res_len);
00039 static cp_string *pgsql_unescape_binary(cp_db_connection *conn, char *str);
00040 static cp_db_statement *
00041     pgsql_prepare_statement(cp_db_connection *conn, int prm_count, 
00042                             cp_field_type *prm_type, char *query);
00043 static int pgsql_execute_statement(cp_db_connection *conn, cp_db_statement *stmt, 
00044                                    cp_result_set **result_set, int *lengths, 
00045                                    void **prm);
00046 static void pgsql_release_statement(cp_db_statement *statement);
00047 static void pgsql_set_autocommit(cp_db_connection *conn, int state);
00048 static int pgsql_commit(cp_db_connection *conn);
00049 static int pgsql_rollback(cp_db_connection *conn);
00050 
00051 volatile static int initialized = 0;
00052 static cp_db_actions *pgsql_db_actions = NULL;
00053 static cp_hashtable *pgsql_field_type_map = NULL;
00054 
00055 #define DBNAME "postgres"
00056 
00057 typedef struct _field_type_map
00058 {
00059     int pgsql_field_code;
00060     int cprops_field_code;
00061 } field_type_map;
00062 
00063 field_type_map pgsql2cprops[] = 
00064 {
00065     {    16, CP_FIELD_TYPE_BOOLEAN },   // bool
00066     {    17, CP_FIELD_TYPE_BLOB },      // bytea
00067     {    18, CP_FIELD_TYPE_CHAR },      // char
00068     {    19, CP_FIELD_TYPE_VARCHAR },   // name
00069     {    20, CP_FIELD_TYPE_LONG_LONG }, // int8
00070     {    21, CP_FIELD_TYPE_SHORT },     // int2
00071 //  {    22, CP_FIELD_TYPE_ },          // int2vector
00072     {    23, CP_FIELD_TYPE_INT },       // int4
00073     {    25, CP_FIELD_TYPE_VARCHAR },   // text
00074     {    26, CP_FIELD_TYPE_INT },       // oid
00075     {   602, CP_FIELD_TYPE_VARCHAR },   // path
00076     {   700, CP_FIELD_TYPE_FLOAT },     // float4
00077     {   701, CP_FIELD_TYPE_DOUBLE },    // float8
00078     {   702, CP_FIELD_TYPE_DATE }, // abstime
00079     {   703, CP_FIELD_TYPE_TIME }, // reltime
00080     {   704, CP_FIELD_TYPE_TIME }, // tinterval
00081 //      {   790, CP_FIELD_TYPE_ }, // money
00082 //      {   791, CP_FIELD_TYPE_ }, // _money
00083     {   829, CP_FIELD_TYPE_VARCHAR }, // macaddr
00084     {   869, CP_FIELD_TYPE_VARCHAR }, // inet
00085 //      {   650, CP_FIELD_TYPE_ }, // cidr
00086     {  1000, CP_FIELD_TYPE_BOOLEAN }, // _bool
00087     {  1001, CP_FIELD_TYPE_VARCHAR }, // _bytea
00088     {  1002, CP_FIELD_TYPE_VARCHAR }, // _char
00089     {  1003, CP_FIELD_TYPE_VARCHAR }, // _name
00090     {  1005, CP_FIELD_TYPE_SHORT }, // _int2
00091 //  {  1006, CP_FIELD_TYPE_ }, // _int2vector
00092     {  1007, CP_FIELD_TYPE_INT }, // _int4
00093     {  1009, CP_FIELD_TYPE_VARCHAR }, // _text
00094     {  1014, CP_FIELD_TYPE_VARCHAR }, // _bpchar
00095     {  1015, CP_FIELD_TYPE_VARCHAR }, // _varchar
00096     {  1016, CP_FIELD_TYPE_LONG_LONG }, // _int8
00097     {  1021, CP_FIELD_TYPE_FLOAT }, // _float4
00098     {  1022, CP_FIELD_TYPE_DOUBLE }, // _float8
00099     {  1023, CP_FIELD_TYPE_DATE }, // _abstime
00100     {  1024, CP_FIELD_TYPE_TIME }, // _reltime
00101     {  1025, CP_FIELD_TYPE_TIME }, // _tinterval
00102     {  1033, CP_FIELD_TYPE_VARCHAR }, // aclitem
00103     {  1034, CP_FIELD_TYPE_VARCHAR }, // _aclitem
00104     {  1040, CP_FIELD_TYPE_VARCHAR }, // _macaddr
00105     {  1041, CP_FIELD_TYPE_VARCHAR }, // _inet
00106 //  {   651, CP_FIELD_TYPE_ }, // _cidr
00107     {  1042, CP_FIELD_TYPE_VARCHAR }, // bpchar
00108     {  1043, CP_FIELD_TYPE_VARCHAR }, // varchar
00109     {  1082, CP_FIELD_TYPE_DATE }, // date
00110     {  1083, CP_FIELD_TYPE_TIME }, // time
00111     {  1114, CP_FIELD_TYPE_TIMESTAMP }, // timestamp
00112     {  1115, CP_FIELD_TYPE_TIMESTAMP }, // _timestamp
00113     {  1182, CP_FIELD_TYPE_DATE }, // _date
00114     {  1183, CP_FIELD_TYPE_TIME }, // _time
00115     {  1184, CP_FIELD_TYPE_TIMESTAMP }, // timestamptz
00116     {  1185, CP_FIELD_TYPE_TIMESTAMP }, // _timestamptz
00117     {  1186, CP_FIELD_TYPE_TIME }, // interval
00118     {  1187, CP_FIELD_TYPE_TIME }, // _interval
00119     {  1231, CP_FIELD_TYPE_INT }, // _numeric
00120     {  1266, CP_FIELD_TYPE_TIME }, // timetz
00121     {  1270, CP_FIELD_TYPE_TIME }, // _timetz
00122     {  1560, CP_FIELD_TYPE_BOOLEAN }, // bit
00123     {  1561, CP_FIELD_TYPE_BOOLEAN }, // _bit
00124     {  1562, CP_FIELD_TYPE_LONG }, // varbit
00125     {  1563, CP_FIELD_TYPE_LONG }, // _varbit
00126     {  1700, CP_FIELD_TYPE_LONG }, // numeric
00127     {  2275, CP_FIELD_TYPE_VARCHAR }, // cstring
00128     {  2276, CP_FIELD_TYPE_VARCHAR }, // any
00129     { 10635, CP_FIELD_TYPE_LONG }, // cardinal_number
00130     { 10637, CP_FIELD_TYPE_VARCHAR } // character_data
00131 };
00132 
00133 void cp_pgsql_shutdown()
00134 {
00135     if (!initialized) return;
00136     initialized = 0;
00137 
00138     if (pgsql_db_actions)
00139     {
00140         cp_db_actions_destroy(pgsql_db_actions);
00141         pgsql_db_actions = NULL;
00142     }
00143 
00144     if (pgsql_field_type_map)
00145     {
00146         cp_hashtable_destroy(pgsql_field_type_map);
00147         pgsql_field_type_map = NULL;
00148     }
00149 }
00150 
00151 void cp_dbms_postgres_init()
00152 {
00153     int id;
00154     int i, types;
00155     if (initialized) return;
00156     initialized = 1;
00157 
00158 #ifdef _WINDOWS
00159     /* must initialize winsock or else */
00160     cp_socket_init();
00161 #endif /* _WINDOWS */
00162 
00163     id = cp_db_register_dbms(DBNAME, cp_pgsql_shutdown);
00164 
00165     pgsql_db_actions = 
00166         cp_db_actions_create(id, DBNAME, pgsql_open_conn, pgsql_select, 
00167                 pgsql_fetch_metadata, pgsql_fetch_next, 
00168                 pgsql_release_result_set, pgsql_update, pgsql_close_conn,
00169                 pgsql_escape_string, pgsql_escape_binary, 
00170                 pgsql_unescape_binary, pgsql_prepare_statement, 
00171                 pgsql_execute_statement, pgsql_release_statement,
00172                 pgsql_set_autocommit, pgsql_commit, pgsql_rollback);
00173 
00174     types = sizeof(pgsql2cprops) / sizeof(field_type_map);
00175     pgsql_field_type_map = 
00176         cp_hashtable_create_by_mode(COLLECTION_MODE_NOSYNC, types,
00177                                     cp_hash_int, cp_hash_compare_int);
00178     for (i = 0; i < types; i++)
00179         cp_hashtable_put(pgsql_field_type_map,
00180                          &pgsql2cprops[i].pgsql_field_code,
00181                          &pgsql2cprops[i].cprops_field_code);
00182 }
00183 
00184 static void 
00185     cp_pgsql_connection_parameters_delete(cp_pgsql_connection_parameters *prm)
00186 {
00187     if (prm)
00188     {
00189         if (prm->host) free(prm->host);
00190         if (prm->login) free(prm->login);
00191         if (prm->password) free(prm->password);
00192         if (prm->db_name) free(prm->db_name);
00193         if (prm->options) free(prm->options);
00194         if (prm->sslmode) free(prm->sslmode);
00195         if (prm->krbsrvname) free(prm->krbsrvname);
00196         if (prm->service) free(prm->service);
00197     }
00198 }
00199 
00200 cp_data_source *
00201     cp_postgres_data_source(char *host, 
00202                             int port, 
00203                             char *login, 
00204                             char *password, 
00205                             char *db_name,
00206                             char *options,
00207                             char *sslmode,
00208                             char *krbsrvname, 
00209                             char *service)
00210 {
00211     cp_data_source *data_source;
00212     cp_pgsql_connection_parameters *impl;
00213     
00214     if (!initialized) cp_dbms_postgres_init();
00215 
00216     data_source = cp_calloc(1, sizeof(cp_data_source));
00217     if (data_source == NULL) return NULL;
00218 
00219     data_source->act = pgsql_db_actions;
00220 
00221     data_source->prm = cp_calloc(1, sizeof(cp_db_connection_parameters));
00222     if (data_source->prm == NULL)
00223     {
00224         cp_free(data_source);
00225         return NULL;
00226     }
00227     impl = calloc(1, sizeof(cp_pgsql_connection_parameters));
00228     if (impl == NULL)
00229     {
00230         cp_free(data_source->prm);
00231         cp_free(data_source);
00232         return NULL;
00233     }
00234     //~~ check for alloc failures
00235     impl->host = host ? strdup(host) : NULL;
00236     impl->port = port;
00237     impl->login = login ? strdup(login) : NULL;
00238     impl->password = password ? strdup(password) : NULL;
00239     impl->db_name = db_name ? strdup(db_name) : NULL;
00240     impl->options = options ? strdup(options) : NULL;
00241     impl->sslmode = sslmode ? strdup(sslmode) : NULL;
00242     impl->krbsrvname = krbsrvname ? strdup(krbsrvname) : NULL;
00243     impl->service = service ? strdup(service) : NULL;
00244 
00245     data_source->prm->impl = impl;
00246     data_source->prm->impl_dtr = 
00247         (cp_destructor_fn) cp_pgsql_connection_parameters_delete;
00248 
00249     return data_source;
00250 }
00251 
00252 cp_data_source *
00253     cp_dbms_postgres_get_data_source(char *host, 
00254                                      int port, 
00255                                      char *login, 
00256                                      char *password,
00257                                      char *dbname)
00258 {
00259     return cp_postgres_data_source(host, port, login, password, dbname, 
00260                                    NULL, NULL, NULL, NULL);
00261 }
00262 
00263 cp_data_source *
00264     cp_dbms_postgres_get_data_source_prm(char *host,
00265                                          int port, 
00266                                          char *login, 
00267                                          char *password, 
00268                                          char *dbname, 
00269                                          cp_hashtable *prm)
00270 {
00271     return cp_postgres_data_source(host, port, login, password, dbname, 
00272                                    cp_hashtable_get(prm, "options"),
00273                                    cp_hashtable_get(prm, "sslmode"),
00274                                    cp_hashtable_get(prm, "krbsrvname"),
00275                                    cp_hashtable_get(prm, "service"));
00276 }
00277 
00278 static int pgsql_close_conn(cp_db_connection *connection)
00279 {
00280     void *conn = connection->connection;
00281     if (conn == NULL) return -1;
00282     PQfinish(conn); 
00283     return 0;
00284 }
00285 
00286 void cs_append(cp_string *cs, char *title, char *value)
00287 {
00288     cp_string_cstrcat(cs, " ");
00289     cp_string_cstrcat(cs, title);
00290     cp_string_cstrcat(cs, "=");
00291     cp_string_cstrcat(cs, value);
00292 }
00293 
00294 static cp_db_connection *pgsql_open_conn(cp_data_source *data_source)
00295 {
00296     PGconn *conn = NULL;
00297     cp_string *cs = cp_string_create("", 0);
00298     cp_pgsql_connection_parameters *conf = data_source->prm->impl;
00299     cp_db_connection *res;
00300 
00301     if (conf->host) cs_append(cs, "host", conf->host);
00302     if (conf->port) 
00303     {
00304         char buf[32];
00305 #ifdef CP_HAS_SNPRINTF
00306         snprintf(buf, 32, " port=%d", conf->port);
00307 #else
00308         sprintf(buf, " port=%d", conf->port);
00309 #endif /* CP_HAS_SNPRINTF */
00310         cp_string_cstrcat(cs, buf);
00311     }
00312     if (conf->login) cs_append(cs, "user", conf->login);
00313     if (conf->password) cs_append(cs, "password", conf->password);
00314     if (conf->db_name) cs_append(cs, "dbname", conf->db_name);
00315     if (conf->options) cs_append(cs, "options", conf->options);
00316     if (conf->sslmode) cs_append(cs, "sslmode", conf->sslmode);
00317     if (conf->krbsrvname) cs_append(cs, "krbsrvname", conf->krbsrvname);
00318     if (conf->service) cs_append(cs, "service", conf->service);
00319 
00320     conn = PQconnectdb(cp_string_tocstr(cs));
00321 
00322     cp_string_delete(cs);
00323 
00324     if (conn == NULL) return NULL;
00325 
00326     if (PQstatus(conn) == CONNECTION_BAD) 
00327     {
00328         cp_error(CP_DBMS_CONNECTION_FAILURE, 
00329                  "PGSQL: connection to database %s failed", conf->db_name); 
00330         cp_error(CP_DBMS_CONNECTION_FAILURE, 
00331                  "PGSQL: %s", PQerrorMessage(conn));
00332         goto failed;
00333     }
00334 
00335 #ifdef __TRACE__
00336     cp_info("PGSQL: Connected to server at %s", conf->host);
00337 #endif
00338 
00339     /* good. now set up connection wrapper */
00340     res = cp_calloc(1, sizeof(cp_db_connection));
00341     res->data_source = data_source;
00342     res->connection = conn;
00343     res->read_result_set_at_once = 1;
00344     res->autocommit = 1;
00345     return res;
00346 
00347 failed:
00348     PQfinish(conn); 
00349     return NULL;
00350 }
00351 
00352 static int pgsql_result_error(PGresult *res)
00353 {
00354     switch(PQresultStatus(res)) 
00355     {
00356         case PGRES_EMPTY_QUERY:
00357         case PGRES_BAD_RESPONSE:
00358         case PGRES_NONFATAL_ERROR:
00359         case PGRES_FATAL_ERROR:
00360             cp_error(1, "PGSQL: %s", PQresultErrorMessage(res));
00361             PQclear(res);
00362             return 1;
00363     
00364         default: 
00365             break;
00366     }
00367 
00368     return 0;
00369 }
00370 
00371 static cp_result_set *
00372     create_result_set(cp_db_connection *_conn, PGresult *res)
00373 {
00374     cp_result_set *result_set = cp_calloc(1, sizeof(cp_result_set));
00375     result_set->field_count = PQnfields(res);
00376     result_set->row_count = PQntuples(res);
00377     result_set->results = cp_list_create();
00378     result_set->connection = _conn;
00379     result_set->source = res;
00380 
00381 //  always fetch meta data - column types needed to decide whether to unescape 
00382 //  BYTEA columns
00383     pgsql_fetch_metadata(result_set);
00384 
00385     return result_set;
00386 }
00387 
00388 cp_string *pgsql_unescape_binary(cp_db_connection *conn, char *str)
00389 {
00390     size_t newlen;
00391     char *unescaped = (char *)PQunescapeBytea((unsigned char *)str, &newlen);
00392     cp_string *res = cp_string_create(unescaped, newlen);
00393     PQfreemem(unescaped);
00394     return res;
00395 }
00396     
00397 static int fetch_rows(cp_result_set *result_set, PGresult *res)
00398 {
00399     int i, j;
00400     cp_list *list = result_set->results;
00401     cp_vector *rec;
00402     cp_field_type *type; 
00403 
00404     for (i = 0; i < result_set->row_count; i++)
00405     {
00406         /* check for chunked reading */
00407         if (!result_set->connection->read_result_set_at_once 
00408                 && result_set->connection->fetch_size != 0 
00409                 && i == result_set->connection->fetch_size) break;
00410 
00411         if (result_set->binary)
00412             rec = cp_vector_create(result_set->field_count);
00413         else
00414             rec = cp_vector_create_by_option(result_set->field_count, 
00415                 COLLECTION_MODE_DEEP, 
00416                 NULL, (cp_destructor_fn) cp_string_destroy);
00417 
00418         for (j = 0; j < result_set->field_count; j++)
00419         {
00420             if (!PQgetisnull(res, i, j))
00421             {
00422                 type = cp_vector_element_at(result_set->field_types, j);
00423                 if (result_set->binary)
00424                 {
00425                     switch (*type)
00426                     {
00427                         case CP_FIELD_TYPE_BOOLEAN:
00428                         {
00429                             char *b = calloc(1, sizeof(short));
00430                             short *p = (short *) b;
00431                             memcpy(b, PQgetvalue(res, i, j), 1);
00432                             *p = (short) *b;
00433                             cp_vector_set_element(rec, j, p);
00434                         }
00435                         break;
00436 
00437                         case CP_FIELD_TYPE_SHORT:
00438                         {
00439                             short *p = malloc(sizeof(short));
00440                             memcpy(p, PQgetvalue(res, i, j), sizeof(short));
00441 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00442                             *p = ntohs(*p);
00443 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00444                             cp_vector_set_element(rec, j, p);
00445                         }
00446                         break;
00447 
00448                         case CP_FIELD_TYPE_INT:
00449                         {
00450                             int *p = malloc(sizeof(int));
00451                             memcpy(p, PQgetvalue(res, i, j), sizeof(int));
00452 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00453                             *p = ntohl(*p);
00454 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00455                             cp_vector_set_element(rec, j, p);
00456                         }
00457                         break;
00458 
00459                         case CP_FIELD_TYPE_LONG:
00460                         {
00461                             long *p = malloc(sizeof(long));
00462                             memcpy(p, PQgetvalue(res, i, j), sizeof(long));
00463 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00464                             if (sizeof(long) == sizeof(int))
00465                                 *p = ntohl(*p);
00466 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00467                             cp_vector_set_element(rec, j, p);
00468                         }
00469                         break;
00470 
00471                         case CP_FIELD_TYPE_FLOAT:
00472                         {
00473                             float *p = calloc(1, sizeof(float));
00474 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00475                             int *q = (int *) p;
00476 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00477                             memcpy(p, PQgetvalue(res, i, j), sizeof(float));
00478 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00479                             *q = ntohl(*q);
00480 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00481                             cp_vector_set_element(rec, j, p);
00482                         }
00483                         break;
00484 
00485                         case CP_FIELD_TYPE_DOUBLE:
00486                         {
00487                             double *p = malloc(sizeof(double));
00488 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00489                             int *q = (int *) p;
00490                             int w;
00491 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00492                             memcpy(p, PQgetvalue(res, i, j), sizeof(double));
00493 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN 
00494                             /* believe it or not */
00495                             q[0] = ntohl(q[0]);
00496                             q[1] = ntohl(q[1]);
00497                             w = q[0];
00498                             q[0] = q[1];
00499                             q[1] = w;
00500 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00501                             cp_vector_set_element(rec, j, p);
00502                         }
00503                         break;
00504 
00505                         case CP_FIELD_TYPE_TIME:
00506                         case CP_FIELD_TYPE_DATE:
00507                         case CP_FIELD_TYPE_TIMESTAMP:
00508                         {
00509                             union 
00510                             {
00511                                 double p;
00512                                 int q[2];
00513                             } up;
00514 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN 
00515                             int w;
00516 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00517                             cp_timestampz *t = calloc(1, sizeof(cp_timestampz));
00518                             memcpy(&up.p, PQgetvalue(res, i, j), sizeof(double));
00519 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN 
00520                             up.q[0] = ntohl(up.q[0]);
00521                             up.q[1] = ntohl(up.q[1]);
00522                             w = up.q[0];
00523                             up.q[0] = up.q[1];
00524                             up.q[1] = w;
00525 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
00526                             up.p += (30 * 365 + 7) * 24 * 3600;
00527                             t->tm.tv_sec = up.p;
00528                             t->tm.tv_usec = (up.p - t->tm.tv_sec) * 1000000;
00529                             cp_vector_set_element(rec, j, t);
00530                         }
00531                         break;
00532                         
00533                         default:
00534                             cp_vector_set_element(rec, j, 
00535                                 cp_string_create(PQgetvalue(res, i, j), 
00536                                                  PQgetlength(res, i, j)));
00537                             break;
00538                     }
00539                 }
00540                 else if (*type == CP_FIELD_TYPE_BLOB)
00541                     cp_vector_set_element(rec, j, 
00542                         pgsql_unescape_binary(NULL, PQgetvalue(res, i, j)));
00543                 else
00544                     cp_vector_set_element(rec, j, 
00545                             cp_string_create(PQgetvalue(res, i, j), 
00546                                              PQgetlength(res, i, j)));
00547             }
00548         }
00549 
00550         cp_list_append(list, rec);
00551     }
00552 
00553     if (i == result_set->row_count) 
00554     {
00555         result_set->fetch_complete = 1;
00556         PQclear(res);
00557         result_set->source = NULL;
00558     }
00559             
00560     return i;
00561 }
00562 
00563 static cp_result_set *pgsql_select(cp_db_connection *_conn, char *query)
00564 {
00565     int rc;
00566     PGresult *res = NULL;
00567     PGconn *conn = (PGconn *) _conn->connection;
00568     cp_result_set *result_set = NULL;
00569 
00570     rc = PQsendQuery(conn, query);
00571     while (PQflush(conn) == 1);
00572     if (rc == 0) 
00573     {
00574         cp_error(CP_DBMS_QUERY_FAILED, "%s", query);
00575         return NULL;
00576     }
00577 
00578     if ((res = PQgetResult(conn)) != NULL)
00579     {
00580         result_set = create_result_set(_conn, res);
00581         do 
00582         {
00583             if (pgsql_result_error(res)) return NULL;
00584             fetch_rows(result_set, res);
00585             /* if doing chunked reading, it's time to break */
00586             if (!_conn->read_result_set_at_once) break;
00587         }
00588         while ((res = PQgetResult(conn)) != NULL);
00589     }
00590     else while (PQflush(conn) == 1);
00591 
00592     return result_set;
00593 }
00594 
00595 static int convert_field_type(int field_type)
00596 {
00597     int *type = cp_hashtable_get(pgsql_field_type_map, &field_type);
00598     return type != NULL ? *type : field_type;
00599 }
00600 
00601 static int pgsql_fetch_metadata(cp_result_set *result_set)
00602 {
00603     int rc = -1;
00604     int j;
00605 
00606     if (result_set->field_headers == NULL)
00607     {
00608         PGresult *res = result_set->source;
00609         result_set->field_headers = 
00610             cp_vector_create_by_option(result_set->field_count, 
00611                                        COLLECTION_MODE_NOSYNC | 
00612                                        COLLECTION_MODE_COPY | 
00613                                        COLLECTION_MODE_DEEP |
00614                                        COLLECTION_MODE_MULTIPLE_VALUES, 
00615                                        (cp_copy_fn) strdup, 
00616                                        (cp_destructor_fn) free);
00617         result_set->field_types = 
00618             cp_vector_create_by_option(result_set->field_count, 
00619                                        COLLECTION_MODE_NOSYNC | 
00620                                        COLLECTION_MODE_COPY | 
00621                                        COLLECTION_MODE_DEEP |
00622                                        COLLECTION_MODE_MULTIPLE_VALUES,
00623                                        (cp_copy_fn) intdup, 
00624                                        (cp_destructor_fn) free);
00625         for (j = 0; j < result_set->field_count; j++)
00626         {
00627             int type = convert_field_type(PQftype(result_set->source, j));
00628             cp_vector_set_element(result_set->field_headers, j, 
00629                                   PQfname(res, j));
00630             cp_vector_set_element(result_set->field_types, j, &type);
00631         }
00632         rc = 0;
00633     }
00634 
00635     return rc;
00636 }
00637 
00638 static int pgsql_fetch_next(cp_result_set *result_set)
00639 {
00640     int i, j;
00641     PGresult *res = result_set->source;
00642     PGconn *conn = (PGconn *) result_set->connection->connection;
00643     cp_vector *rec;
00644 
00645     while (res)
00646     {
00647         if (pgsql_result_error(res)) return -1;
00648 
00649         for (i = result_set->position; i < result_set->row_count; i++)
00650         {
00651             if (((i + 1) % result_set->connection->fetch_size) == 0) break;
00652 
00653             rec = cp_vector_create_by_option(result_set->field_count, 
00654                     COLLECTION_MODE_DEEP, 
00655                     NULL, (cp_destructor_fn) cp_string_destroy);
00656 
00657             for (j = 0; j < result_set->field_count; j++)
00658             {
00659                 if (!PQgetisnull(res, i, j))
00660                     cp_vector_set_element(rec, j, 
00661                         cp_string_create(PQgetvalue(res, i, j), 
00662                                          PQgetlength(res, i, j)));
00663             }
00664 
00665             cp_list_append(result_set->results, rec);
00666         }
00667 
00668         if (i == result_set->row_count) 
00669         {
00670             result_set->fetch_complete = 1;
00671             PQclear(res);
00672             result_set->source = NULL;
00673         }
00674         else /* read chunk, return */
00675             break;
00676 
00677         res = PQgetResult(conn);
00678     }
00679 
00680     return 0;
00681 }
00682 
00683 static int pgsql_release_result_set(cp_result_set *result_set)
00684 {
00685     if (result_set->source)
00686     {
00687         PGresult *res = result_set->source;
00688         PGconn *conn = (PGconn *) result_set->connection->connection;
00689         do
00690         {
00691             PQclear(res);
00692         } while ((res = PQgetResult(conn)) != NULL);
00693     }
00694 
00695     return 0;
00696 }
00697 
00698 static int pgsql_update(cp_db_connection *_conn, char *query)
00699 {
00700     int rows = 0;
00701     PGresult *res = NULL;
00702     PGconn *conn = (PGconn*) _conn->connection;
00703     int rc;
00704 
00705     rc = PQsendQuery(conn, query);
00706     while (PQflush(conn) == 1);
00707     if (rc == 0) 
00708     {
00709         cp_error(CP_DBMS_QUERY_FAILED, "%s", query);
00710         return -1;
00711     }
00712 
00713     while ((res = PQgetResult(conn)) != NULL)
00714     {
00715         switch (PQresultStatus(res)) 
00716         {
00717             case PGRES_BAD_RESPONSE:
00718             case PGRES_NONFATAL_ERROR:
00719             case PGRES_FATAL_ERROR:
00720                 cp_error(CP_DBMS_QUERY_FAILED, "postgres: %s", query);
00721                 cp_error(CP_DBMS_QUERY_FAILED, 
00722                          "postgres: %s", PQresultErrorMessage(res));
00723                 PQclear(res);
00724                 return -1;
00725 
00726             default:
00727                 rows += atoi(PQcmdTuples(res));
00728                 break;
00729         }
00730         PQclear(res);
00731     }
00732 
00733     return rows;
00734 }
00735 
00736 static char *pgsql_escape_string(cp_db_connection *conn, char *src, size_t len)
00737 {
00738     char *res = malloc(len * 2 + 1); /* postgres spec */
00739     PQescapeString(res, src, len);
00740     return res;
00741 }
00742 
00743 static char *pgsql_escape_binary(cp_db_connection *conn, char *src, 
00744                                  size_t len, size_t *res_len)
00745 {
00746     return (char *)PQescapeBytea((unsigned char *)src, len, res_len);
00747 }
00748 
00749 //~~ magic numbers are postgres OIDs - maybe include 
00750 //~~ postgresql/server/catalog/pg_type.h instead
00751 static Oid cprops2pgsql(cp_field_type type)
00752 {
00753     switch(type)
00754     {
00755         case CP_FIELD_TYPE_BOOLEAN: return 16;
00756         case CP_FIELD_TYPE_CHAR: return 1043; // 18
00757         case CP_FIELD_TYPE_SHORT:return 21;
00758         case CP_FIELD_TYPE_INT: return 23;
00759         case CP_FIELD_TYPE_LONG: return sizeof(long) == sizeof(int) ? 23 : 20;
00760         case CP_FIELD_TYPE_LONG_LONG: return 20;
00761         case CP_FIELD_TYPE_FLOAT: return 700;
00762         case CP_FIELD_TYPE_DOUBLE: return 701;
00763         case CP_FIELD_TYPE_VARCHAR: return 1043;
00764         case CP_FIELD_TYPE_BLOB: return 17;
00765         case CP_FIELD_TYPE_DATE: return 1114; // 1082
00766         case CP_FIELD_TYPE_TIME: return 1083;
00767         case CP_FIELD_TYPE_TIMESTAMP: return 1114;
00768         default: return 2276;
00769     }
00770 
00771     return 2276;
00772 }
00773 
00774 char *format_prm(char *query, int prm_count)
00775 {
00776     char *subst = query;
00777     char marker[5]; /* no more than 999 parameters */
00778     int count = 1;
00779     
00780     if (prm_count)
00781     {
00782         cp_string *buf = NULL;
00783         char *curr = query;
00784         char *prev = NULL;
00785         while  ((curr = strchr(curr, '?')) != NULL)
00786         {
00787             if (buf == NULL) 
00788             {
00789                 buf = cp_string_create("", 0);
00790                 prev = query;
00791             }
00792 
00793             cp_string_cat_bin(buf, prev, curr - prev);
00794 #ifdef CP_HAS_SNPRINTF
00795             snprintf(marker, 5, "$%d", count);
00796 #else
00797             sprintf(marker, "$%d", count);
00798 #endif /* CP_HAS_SNPRINTF */
00799             count++;
00800             cp_string_cstrcat(buf, marker);
00801             prev = ++curr;
00802         }
00803         if (prev) cp_string_cstrcat(buf, prev);
00804         if (buf)
00805         {
00806             subst = buf->data;
00807             free(buf);
00808         }
00809     }
00810 
00811     return subst;
00812 }
00813 
00814 static cp_db_statement *pgsql_prepare_statement(cp_db_connection *conn, 
00815                                                 int prm_count, 
00816                                                 cp_field_type *prm_type, 
00817                                                 char *query)
00818 {
00819     cp_db_statement *statement = NULL;
00820     int i;
00821     PGresult *res;
00822     Oid *paramTypes = malloc(prm_count * sizeof(Oid));
00823     char *stmt = malloc(21);
00824     char *subst;
00825     ExecStatusType rc;
00826 
00827 #ifdef CP_HAS_STRLCPY
00828     strlcpy(stmt, "STMT", 21);
00829 #else
00830     strcpy(stmt, "STMT");
00831 #endif /* CP_HAS_STRLCPY */
00832     gen_id_str(&stmt[4]);
00833 
00834     for (i = 0; i < prm_count; i++)
00835         paramTypes[i] = cprops2pgsql(prm_type[i]);
00836     
00837     /* parameters may be given as ?, postgres expects $1, $2 etc */
00838     subst = format_prm(query, prm_count);
00839     res = PQprepare(conn->connection, stmt, subst, prm_count, paramTypes);
00840     free(paramTypes);
00841     if (subst != query) free(subst);
00842 
00843     if (res == NULL)
00844     {
00845         cp_error(CP_DBMS_CLIENT_ERROR, "%s - prepare statement: %s", 
00846                  conn->data_source->act->dbms_lit, 
00847                  PQerrorMessage(conn->connection));
00848         free(stmt);
00849     }
00850     else
00851     {
00852         rc = PQresultStatus(res);
00853         switch (rc)
00854         {
00855             case PGRES_EMPTY_QUERY:
00856             case PGRES_BAD_RESPONSE:
00857             case PGRES_FATAL_ERROR:
00858                 free(stmt);
00859                 stmt = NULL;
00860                 cp_error(CP_DBMS_CLIENT_ERROR, "%s - prepare statement: %s", 
00861                          conn->data_source->act->dbms_lit, 
00862                          PQresultErrorMessage(res));
00863                 break;
00864 
00865             case PGRES_NONFATAL_ERROR:
00866                 cp_warn("%s - prepare statement: %s", 
00867                         conn->data_source->act->dbms_lit, 
00868                         PQresultErrorMessage(res));
00869             default:
00870                 statement = cp_db_statement_instantiate(conn, prm_count, 
00871                                                         prm_type, stmt);
00872                 if (statement == NULL)
00873                 {
00874                     //~~ report error
00875                     free(stmt);
00876                 }
00877                 break;
00878             
00879         }
00880         PQclear(res);
00881     }
00882 
00883     if (statement)
00884         statement->store = memdup(prm_type, prm_count * sizeof(cp_field_type));
00885 
00886     return statement;
00887 }
00888 
00889 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00890 
00891 void hton8byte(void *ptr)
00892 {
00893     int w;
00894     union
00895     {
00896         double d;
00897         int i[2];
00898     } *u;
00899 
00900     u = ptr;
00901     w = ntohl(u->i[0]);
00902     u->i[0] = ntohl(u->i[1]);
00903     u->i[1] = w;
00904 }
00905     
00906     
00907 static void **to_network_order(cp_field_type *types, void **prm, int count)
00908 {
00909     int i;
00910     void **res = calloc(count, sizeof(void *));
00911     short *pshort;
00912     int *pint;
00913     long *plong;
00914     float *pfloat;
00915     double *pdouble;
00916     
00917     for (i = 0; i < count; i++)
00918     {
00919         switch (types[i])
00920         {
00921             case CP_FIELD_TYPE_SHORT:
00922                 pshort = malloc(sizeof(short));
00923                 memcpy(pshort, prm[i], sizeof(short));
00924                 *pshort = htons(*pshort);
00925                 res[i] = pshort;
00926                 break;
00927 
00928             case CP_FIELD_TYPE_INT:
00929                 pint = malloc(sizeof(int));
00930                 memcpy(pint, prm[i], sizeof(int));
00931                 *pint = htonl(*pint);
00932                 res[i] = pint;
00933                 break;
00934 
00935             case CP_FIELD_TYPE_LONG:
00936                 if (sizeof(long) == sizeof(int))
00937                 {
00938                     plong = malloc(sizeof(long));
00939                     memcpy(plong, prm[i], sizeof(long));
00940                     *plong = htonl(*plong);
00941                     res[i] = plong;
00942                     break;
00943                 }
00944 
00945             case CP_FIELD_TYPE_FLOAT:
00946                 pfloat = malloc(sizeof(float));
00947                 memcpy(pfloat, prm[i], sizeof(float));
00948                 plong = (long *) pfloat;
00949                 *plong = htonl(*plong);
00950                 res[i] = pfloat;
00951                 break;
00952 
00953             case CP_FIELD_TYPE_DOUBLE:
00954                 pdouble = malloc(sizeof(double));
00955                 memcpy(pdouble, prm[i], sizeof(double));
00956                 hton8byte(pdouble);
00957                 res[i] = pdouble;
00958                 break;
00959 
00960             case CP_FIELD_TYPE_TIME:
00961             case CP_FIELD_TYPE_DATE:
00962             case CP_FIELD_TYPE_TIMESTAMP:
00963                 res[i] = prm[i];
00964                 hton8byte(res[i]);
00965                 break;
00966 
00967             default:
00968                 res[i] = prm[i];
00969         }
00970     }
00971 
00972     return res;
00973 }
00974 
00975 void delete_prm(void **in_order_prm, cp_field_type *types, int count)
00976 {
00977     int i;
00978 
00979     for (i = 0; i < count; i++)
00980     {
00981         switch (types[i])
00982         {
00983             case CP_FIELD_TYPE_SHORT:
00984             case CP_FIELD_TYPE_INT:
00985             case CP_FIELD_TYPE_FLOAT:
00986             case CP_FIELD_TYPE_DOUBLE:
00987                 free(in_order_prm[i]);
00988                 break;
00989 
00990             case CP_FIELD_TYPE_LONG:
00991                 if (sizeof(long) == sizeof(int))
00992                 {
00993                     free(in_order_prm[i]);
00994                     break;
00995                 }
00996 
00997             default:
00998                 break;
00999         }
01000     }
01001 
01002     free(in_order_prm);
01003 }
01004 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01005 
01006 static void *to_pgtime(cp_timestampz *tmz)
01007 {
01008     double *d = calloc(1, sizeof(double));
01009     *d = tmz->tm.tv_sec + ((double) tmz->tm.tv_usec) / 1e6
01010          + tmz->tz_minuteswest * 60 - ((30 * 365 + 7) * 86400);
01011     return d;
01012 }
01013 
01014 static void subst_cp2pgsql(int count, cp_field_type *types, void **prm, int *lengths)
01015 {
01016     int i;
01017     char *ch;
01018     
01019     for (i = 0; i < count; i++)
01020     {
01021         switch (types[i])
01022         {
01023             case CP_FIELD_TYPE_BOOLEAN:
01024                 ch = prm[i];
01025                 *ch = (prm[i] == 0 ? 'f' : 't');
01026                 lengths[i] = 1;
01027                 break;
01028 
01029             case CP_FIELD_TYPE_TIME:
01030             case CP_FIELD_TYPE_DATE:
01031             case CP_FIELD_TYPE_TIMESTAMP:
01032                 prm[i] = to_pgtime(prm[i]);
01033                 lengths[i] = sizeof(double);
01034                 break;
01035 
01036             default:
01037                 break;
01038         }
01039     }
01040 }
01041 
01042 static void free_cp2pgsql(int count, cp_field_type *types, void **prm)
01043 {
01044     int i;
01045     
01046     for (i = 0; i < count; i++)
01047     {
01048         if (types[i] == CP_FIELD_TYPE_TIME ||
01049             types[i] == CP_FIELD_TYPE_DATE ||
01050             types[i] == CP_FIELD_TYPE_TIMESTAMP)
01051         {
01052             free(prm[i]);
01053         }
01054     }
01055 }
01056 
01057 static int pgsql_execute_statement(cp_db_connection *conn, 
01058                                    cp_db_statement *stmt, 
01059                                    cp_result_set **result_set, 
01060                                    int *lengths, 
01061                                    void **prm)
01062 {
01063     int *fmt = NULL;
01064     cp_result_set *res = NULL;
01065     PGresult *pgres = NULL;
01066     ExecStatusType ret;
01067     int i;
01068     int rc = 0;
01069     cp_field_type *types = NULL;
01070 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01071     void **in_order_prm = prm;
01072 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01073 
01074     subst_cp2pgsql(stmt->prm_count, stmt->store, prm, lengths);
01075 
01076     if (lengths)
01077     {
01078         types = stmt->store;
01079         fmt = calloc(stmt->prm_count, sizeof(int));
01080         for (i = 0; i < stmt->prm_count; i++) fmt[i] = 1;
01081 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01082         in_order_prm = to_network_order(types, prm, stmt->prm_count);
01083 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01084     }
01085 
01086     pgres = PQexecPrepared(conn->connection, (char *) stmt->source,
01087                            stmt->prm_count,
01088 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01089                            (const char * const *) in_order_prm,
01090 #else
01091                            (const char * const *) prm,
01092 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */                           
01093                            lengths, fmt, stmt->binary);
01094     free_cp2pgsql(stmt->prm_count, stmt->store, prm);
01095     if (fmt) free(fmt);
01096 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01097     if (in_order_prm != prm) delete_prm(in_order_prm, types, stmt->prm_count);
01098 #endif /* CP_BYTE_ORDER_LITTLE_ENDIAN */
01099     
01100     ret = PQresultStatus(pgres);
01101     switch (ret)
01102     {
01103         case PGRES_TUPLES_OK:
01104             res = create_result_set(conn, pgres);
01105             res->binary = stmt->binary;
01106             fetch_rows(res, pgres);
01107             break;
01108 
01109         case PGRES_COMMAND_OK:
01110             rc = atoi(PQcmdTuples(pgres));
01111             PQclear(pgres);
01112             break;
01113 
01114         case PGRES_EMPTY_QUERY:
01115         case PGRES_BAD_RESPONSE:
01116         case PGRES_FATAL_ERROR:
01117         case PGRES_NONFATAL_ERROR:
01118         default:
01119             rc = -1;
01120             PQclear(pgres);
01121             cp_error(CP_DBMS_QUERY_FAILED, "%s - prepared statement: %s", 
01122                      conn->data_source->act->dbms_lit, 
01123                      PQresultErrorMessage(pgres));
01124             break;
01125     }
01126 
01127     if (result_set)
01128         *result_set = res;
01129 
01130     return rc;
01131 }
01132 
01133 static void pgsql_release_statement(cp_db_statement *statement)
01134 {
01135     if (statement->source) free(statement->source);
01136     if (statement->store) free(statement->store);
01137 }
01138 
01139 static void pgsql_set_autocommit(cp_db_connection *conn, int state)
01140 {
01141     if (state == 0)
01142         pgsql_update(conn, "begin;");
01143 }
01144 
01145 static int pgsql_commit(cp_db_connection *conn)
01146 {
01147     int rc = pgsql_update(conn, "commit;");
01148     if (conn->autocommit == 0)
01149         pgsql_update(conn, "begin;");
01150     return rc;
01151 }
01152 
01153 static int pgsql_rollback(cp_db_connection *conn)
01154 {
01155     int rc = pgsql_update(conn, "rollback;");
01156     if (conn->autocommit == 0)
01157         pgsql_update(conn, "begin;");
01158     return rc;
01159 }
01160 
01161 #ifdef WINDOWS
01162 BOOL APIENTRY DllMain( HANDLE hModule, 
01163                        DWORD  ul_reason_for_call, 
01164                        LPVOID lpReserved
01165                      )
01166 {
01167     switch (ul_reason_for_call)
01168     {
01169         case DLL_PROCESS_ATTACH:
01170         case DLL_THREAD_ATTACH:
01171         case DLL_THREAD_DETACH:
01172         case DLL_PROCESS_DETACH:
01173             break;
01174     }
01175     return TRUE;
01176 }
01177 #endif /* _WINDOWS */

Generated on Mon Dec 5 23:00:21 2011 for cprops by  doxygen 1.4.7