00001 #include <libpq-fe.h>
00002 #include <stdlib.h>
00003 #include <string.h>
00004 #include <stdarg.h>
00005 #ifdef CP_HAS_NETINET_IN_H
00006 #include <netinet/in.h>
00007 #endif
00008 #ifdef _WINDOWS
00009 #include "socket.h"
00010 #endif
00011 #include <time.h>
00012 #include <stdio.h>
00013
00014 #include "config.h"
00015 #include "common.h"
00016 #include "db.h"
00017 #include "str.h"
00018 #include "util.h"
00019 #include "hashtable.h"
00020 #include "vector.h"
00021 #include "log.h"
00022
00023 #include "db_postgres.h"
00024
00025 static cp_db_connection *pgsql_open_conn(cp_data_source *data_source);
00026 static cp_result_set *pgsql_select(cp_db_connection *_conn, char *query);
00027 static int pgsql_fetch_metadata(cp_result_set *result_set);
00028 static int pgsql_fetch_next(cp_result_set *result_set);
00029 static int pgsql_release_result_set(cp_result_set *result_set);
00030 static int pgsql_update(cp_db_connection *_conn, char *query);
00031 static int pgsql_close_conn(cp_db_connection *connection);
00032 static char *pgsql_escape_string(cp_db_connection *conn, char *str, int len);
00033 static char *pgsql_escape_binary(cp_db_connection *conn, char *str,
00034 int len, int *res_len);
00035 static cp_string *pgsql_unescape_binary(cp_db_connection *conn, char *str);
00036 static cp_db_statement *
00037 pgsql_prepare_statement(cp_db_connection *conn, int prm_count,
00038 cp_field_type *prm_type, char *query);
00039 static int pgsql_execute_statement(cp_db_connection *conn, cp_db_statement *stmt,
00040 cp_result_set **result_set, int *lengths,
00041 void **prm);
00042 static void pgsql_release_statement(cp_db_statement *statement);
00043 static void pgsql_set_autocommit(cp_db_connection *conn, int state);
00044 static int pgsql_commit(cp_db_connection *conn);
00045 static int pgsql_rollback(cp_db_connection *conn);
00046
00047 volatile static int initialized = 0;
00048 static cp_db_actions *pgsql_db_actions = NULL;
00049 static cp_hashtable *pgsql_field_type_map = NULL;
00050
00051 #define DBNAME "postgres"
00052
00053 typedef struct _field_type_map
00054 {
00055 int pgsql_field_code;
00056 int cprops_field_code;
00057 } field_type_map;
00058
00059 field_type_map pgsql2cprops[] =
00060 {
00061 { 16, CP_FIELD_TYPE_BOOLEAN },
00062 { 17, CP_FIELD_TYPE_BLOB },
00063 { 18, CP_FIELD_TYPE_CHAR },
00064 { 19, CP_FIELD_TYPE_VARCHAR },
00065 { 20, CP_FIELD_TYPE_LONG_LONG },
00066 { 21, CP_FIELD_TYPE_SHORT },
00067
00068 { 23, CP_FIELD_TYPE_INT },
00069 { 25, CP_FIELD_TYPE_VARCHAR },
00070 { 26, CP_FIELD_TYPE_INT },
00071 { 602, CP_FIELD_TYPE_VARCHAR },
00072 { 700, CP_FIELD_TYPE_FLOAT },
00073 { 701, CP_FIELD_TYPE_DOUBLE },
00074 { 702, CP_FIELD_TYPE_DATE },
00075 { 703, CP_FIELD_TYPE_TIME },
00076 { 704, CP_FIELD_TYPE_TIME },
00077
00078
00079 { 829, CP_FIELD_TYPE_VARCHAR },
00080 { 869, CP_FIELD_TYPE_VARCHAR },
00081
00082 { 1000, CP_FIELD_TYPE_BOOLEAN },
00083 { 1001, CP_FIELD_TYPE_VARCHAR },
00084 { 1002, CP_FIELD_TYPE_VARCHAR },
00085 { 1003, CP_FIELD_TYPE_VARCHAR },
00086 { 1005, CP_FIELD_TYPE_SHORT },
00087
00088 { 1007, CP_FIELD_TYPE_INT },
00089 { 1009, CP_FIELD_TYPE_VARCHAR },
00090 { 1014, CP_FIELD_TYPE_VARCHAR },
00091 { 1015, CP_FIELD_TYPE_VARCHAR },
00092 { 1016, CP_FIELD_TYPE_LONG_LONG },
00093 { 1021, CP_FIELD_TYPE_FLOAT },
00094 { 1022, CP_FIELD_TYPE_DOUBLE },
00095 { 1023, CP_FIELD_TYPE_DATE },
00096 { 1024, CP_FIELD_TYPE_TIME },
00097 { 1025, CP_FIELD_TYPE_TIME },
00098 { 1033, CP_FIELD_TYPE_VARCHAR },
00099 { 1034, CP_FIELD_TYPE_VARCHAR },
00100 { 1040, CP_FIELD_TYPE_VARCHAR },
00101 { 1041, CP_FIELD_TYPE_VARCHAR },
00102
00103 { 1042, CP_FIELD_TYPE_VARCHAR },
00104 { 1043, CP_FIELD_TYPE_VARCHAR },
00105 { 1082, CP_FIELD_TYPE_DATE },
00106 { 1083, CP_FIELD_TYPE_TIME },
00107 { 1114, CP_FIELD_TYPE_TIMESTAMP },
00108 { 1115, CP_FIELD_TYPE_TIMESTAMP },
00109 { 1182, CP_FIELD_TYPE_DATE },
00110 { 1183, CP_FIELD_TYPE_TIME },
00111 { 1184, CP_FIELD_TYPE_TIMESTAMP },
00112 { 1185, CP_FIELD_TYPE_TIMESTAMP },
00113 { 1186, CP_FIELD_TYPE_TIME },
00114 { 1187, CP_FIELD_TYPE_TIME },
00115 { 1231, CP_FIELD_TYPE_INT },
00116 { 1266, CP_FIELD_TYPE_TIME },
00117 { 1270, CP_FIELD_TYPE_TIME },
00118 { 1560, CP_FIELD_TYPE_BOOLEAN },
00119 { 1561, CP_FIELD_TYPE_BOOLEAN },
00120 { 1562, CP_FIELD_TYPE_LONG },
00121 { 1563, CP_FIELD_TYPE_LONG },
00122 { 1700, CP_FIELD_TYPE_LONG },
00123 { 2275, CP_FIELD_TYPE_VARCHAR },
00124 { 2276, CP_FIELD_TYPE_VARCHAR },
00125 { 10635, CP_FIELD_TYPE_LONG },
00126 { 10637, CP_FIELD_TYPE_VARCHAR }
00127 };
00128
00129 void cp_pgsql_shutdown()
00130 {
00131 if (!initialized) return;
00132 initialized = 0;
00133
00134 if (pgsql_db_actions)
00135 {
00136 cp_db_actions_destroy(pgsql_db_actions);
00137 pgsql_db_actions = NULL;
00138 }
00139
00140 if (pgsql_field_type_map)
00141 {
00142 cp_hashtable_destroy(pgsql_field_type_map);
00143 pgsql_field_type_map = NULL;
00144 }
00145 }
00146
00147 void cp_dbms_postgres_init()
00148 {
00149 int id;
00150 int i, types;
00151 if (initialized) return;
00152 initialized = 1;
00153
00154 #ifdef _WINDOWS
00155
00156 cp_socket_init();
00157 #endif
00158
00159 id = cp_db_register_dbms(DBNAME, cp_pgsql_shutdown);
00160
00161 pgsql_db_actions =
00162 cp_db_actions_create(id, DBNAME, pgsql_open_conn, pgsql_select,
00163 pgsql_fetch_metadata, pgsql_fetch_next,
00164 pgsql_release_result_set, pgsql_update, pgsql_close_conn,
00165 pgsql_escape_string, pgsql_escape_binary,
00166 pgsql_unescape_binary, pgsql_prepare_statement,
00167 pgsql_execute_statement, pgsql_release_statement,
00168 pgsql_set_autocommit, pgsql_commit, pgsql_rollback);
00169
00170 types = sizeof(pgsql2cprops) / sizeof(field_type_map);
00171 pgsql_field_type_map =
00172 cp_hashtable_create_by_mode(COLLECTION_MODE_NOSYNC, types,
00173 cp_hash_int, cp_hash_compare_int);
00174 for (i = 0; i < types; i++)
00175 cp_hashtable_put(pgsql_field_type_map,
00176 &pgsql2cprops[i].pgsql_field_code,
00177 &pgsql2cprops[i].cprops_field_code);
00178 }
00179
00180 static void
00181 cp_pgsql_connection_parameters_delete(cp_pgsql_connection_parameters *prm)
00182 {
00183 if (prm)
00184 {
00185 if (prm->host) free(prm->host);
00186 if (prm->login) free(prm->login);
00187 if (prm->password) free(prm->password);
00188 if (prm->db_name) free(prm->db_name);
00189 if (prm->options) free(prm->options);
00190 if (prm->sslmode) free(prm->sslmode);
00191 if (prm->krbsrvname) free(prm->krbsrvname);
00192 if (prm->service) free(prm->service);
00193 }
00194 }
00195
00196 cp_data_source *
00197 cp_postgres_data_source(char *host,
00198 int port,
00199 char *login,
00200 char *password,
00201 char *db_name,
00202 char *options,
00203 char *sslmode,
00204 char *krbsrvname,
00205 char *service)
00206 {
00207 cp_data_source *data_source;
00208 cp_pgsql_connection_parameters *impl;
00209
00210 if (!initialized) cp_dbms_postgres_init();
00211
00212 data_source = cp_calloc(1, sizeof(cp_data_source));
00213 if (data_source == NULL) return NULL;
00214
00215 data_source->act = pgsql_db_actions;
00216
00217 data_source->prm = cp_calloc(1, sizeof(cp_db_connection_parameters));
00218 if (data_source->prm == NULL)
00219 {
00220 cp_free(data_source);
00221 return NULL;
00222 }
00223 impl = calloc(1, sizeof(cp_pgsql_connection_parameters));
00224 if (impl == NULL)
00225 {
00226 cp_free(data_source->prm);
00227 cp_free(data_source);
00228 return NULL;
00229 }
00230
00231 impl->host = host ? strdup(host) : NULL;
00232 impl->port = port;
00233 impl->login = login ? strdup(login) : NULL;
00234 impl->password = password ? strdup(password) : NULL;
00235 impl->db_name = db_name ? strdup(db_name) : NULL;
00236 impl->options = options ? strdup(options) : NULL;
00237 impl->sslmode = sslmode ? strdup(sslmode) : NULL;
00238 impl->krbsrvname = krbsrvname ? strdup(krbsrvname) : NULL;
00239 impl->service = service ? strdup(service) : NULL;
00240
00241 data_source->prm->impl = impl;
00242 data_source->prm->impl_dtr =
00243 (cp_destructor_fn) cp_pgsql_connection_parameters_delete;
00244
00245 return data_source;
00246 }
00247
00248 cp_data_source *
00249 cp_dbms_postgres_get_data_source(char *host,
00250 int port,
00251 char *login,
00252 char *password,
00253 char *dbname)
00254 {
00255 return cp_postgres_data_source(host, port, login, password, dbname,
00256 NULL, NULL, NULL, NULL);
00257 }
00258
00259 cp_data_source *
00260 cp_dbms_postgres_get_data_source_prm(char *host,
00261 int port,
00262 char *login,
00263 char *password,
00264 char *dbname,
00265 cp_hashtable *prm)
00266 {
00267 return cp_postgres_data_source(host, port, login, password, dbname,
00268 cp_hashtable_get(prm, "options"),
00269 cp_hashtable_get(prm, "sslmode"),
00270 cp_hashtable_get(prm, "krbsrvname"),
00271 cp_hashtable_get(prm, "service"));
00272 }
00273
00274 static int pgsql_close_conn(cp_db_connection *connection)
00275 {
00276 void *conn = connection->connection;
00277 if (conn == NULL) return -1;
00278 PQfinish(conn);
00279 return 0;
00280 }
00281
00282 void cs_append(cp_string *cs, char *title, char *value)
00283 {
00284 cp_string_cstrcat(cs, " ");
00285 cp_string_cstrcat(cs, title);
00286 cp_string_cstrcat(cs, "=");
00287 cp_string_cstrcat(cs, value);
00288 }
00289
00290 static cp_db_connection *pgsql_open_conn(cp_data_source *data_source)
00291 {
00292 PGconn *conn = NULL;
00293 cp_string *cs = cp_string_create("", 0);
00294 cp_pgsql_connection_parameters *conf = data_source->prm->impl;
00295 cp_db_connection *res;
00296
00297 if (conf->host) cs_append(cs, "host", conf->host);
00298 if (conf->port)
00299 {
00300 char buf[32];
00301 #ifdef CP_HAS_SNPRINTF
00302 snprintf(buf, 32, " port=%d", conf->port);
00303 #else
00304 sprintf(buf, " port=%d", conf->port);
00305 #endif
00306 cp_string_cstrcat(cs, buf);
00307 }
00308 if (conf->login) cs_append(cs, "user", conf->login);
00309 if (conf->password) cs_append(cs, "password", conf->password);
00310 if (conf->db_name) cs_append(cs, "dbname", conf->db_name);
00311 if (conf->options) cs_append(cs, "options", conf->options);
00312 if (conf->sslmode) cs_append(cs, "sslmode", conf->sslmode);
00313 if (conf->krbsrvname) cs_append(cs, "krbsrvname", conf->krbsrvname);
00314 if (conf->service) cs_append(cs, "service", conf->service);
00315
00316 conn = PQconnectdb(cp_string_tocstr(cs));
00317
00318 cp_string_delete(cs);
00319
00320 if (conn == NULL) return NULL;
00321
00322 if (PQstatus(conn) == CONNECTION_BAD)
00323 {
00324 cp_error(CP_DBMS_CONNECTION_FAILURE,
00325 "PGSQL: connection to database %s failed", conf->db_name);
00326 cp_error(CP_DBMS_CONNECTION_FAILURE,
00327 "PGSQL: %s", PQerrorMessage(conn));
00328 goto failed;
00329 }
00330
00331 #ifdef __TRACE__
00332 cp_info("PGSQL: Connected to server at %s", conf->host);
00333 #endif
00334
00335
00336 res = cp_calloc(1, sizeof(cp_db_connection));
00337 res->data_source = data_source;
00338 res->connection = conn;
00339 res->read_result_set_at_once = 1;
00340 res->autocommit = 1;
00341 return res;
00342
00343 failed:
00344 PQfinish(conn);
00345 return NULL;
00346 }
00347
00348 static int pgsql_result_error(PGresult *res)
00349 {
00350 switch(PQresultStatus(res))
00351 {
00352 case PGRES_EMPTY_QUERY:
00353 case PGRES_BAD_RESPONSE:
00354 case PGRES_NONFATAL_ERROR:
00355 case PGRES_FATAL_ERROR:
00356 cp_error(1, "PGSQL: %s", PQresultErrorMessage(res));
00357 PQclear(res);
00358 return 1;
00359
00360 default:
00361 break;
00362 }
00363
00364 return 0;
00365 }
00366
00367 static cp_result_set *
00368 create_result_set(cp_db_connection *_conn, PGresult *res)
00369 {
00370 cp_result_set *result_set = cp_calloc(1, sizeof(cp_result_set));
00371 result_set->field_count = PQnfields(res);
00372 result_set->row_count = PQntuples(res);
00373 result_set->results = cp_list_create();
00374 result_set->connection = _conn;
00375 result_set->source = res;
00376
00377
00378
00379 pgsql_fetch_metadata(result_set);
00380
00381 return result_set;
00382 }
00383
00384 cp_string *pgsql_unescape_binary(cp_db_connection *conn, char *str)
00385 {
00386 int newlen;
00387 char *unescaped = PQunescapeBytea(str, &newlen);
00388 cp_string *res = cp_string_create(unescaped, newlen);
00389 PQfreemem(unescaped);
00390 return res;
00391 }
00392
00393 static int fetch_rows(cp_result_set *result_set, PGresult *res)
00394 {
00395 int i, j;
00396 cp_list *list = result_set->results;
00397 cp_vector *rec;
00398 cp_field_type *type;
00399
00400 for (i = 0; i < result_set->row_count; i++)
00401 {
00402
00403 if (!result_set->connection->read_result_set_at_once
00404 && result_set->connection->fetch_size != 0
00405 && i == result_set->connection->fetch_size) break;
00406
00407 if (result_set->binary)
00408 rec = cp_vector_create(result_set->field_count);
00409 else
00410 rec = cp_vector_create_by_option(result_set->field_count,
00411 COLLECTION_MODE_DEEP,
00412 NULL, (cp_destructor_fn) cp_string_destroy);
00413
00414 for (j = 0; j < result_set->field_count; j++)
00415 {
00416 if (!PQgetisnull(res, i, j))
00417 {
00418 type = cp_vector_element_at(result_set->field_types, j);
00419 if (result_set->binary)
00420 {
00421 switch (*type)
00422 {
00423 case CP_FIELD_TYPE_BOOLEAN:
00424 {
00425 char *b = calloc(1, sizeof(short));
00426 short *p = (short *) b;
00427 memcpy(b, PQgetvalue(res, i, j), 1);
00428 *p = (short) *b;
00429 cp_vector_set_element(rec, j, p);
00430 }
00431 break;
00432
00433 case CP_FIELD_TYPE_SHORT:
00434 {
00435 short *p = malloc(sizeof(short));
00436 memcpy(p, PQgetvalue(res, i, j), sizeof(short));
00437 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00438 *p = ntohs(*p);
00439 #endif
00440 cp_vector_set_element(rec, j, p);
00441 }
00442 break;
00443
00444 case CP_FIELD_TYPE_INT:
00445 {
00446 int *p = malloc(sizeof(int));
00447 memcpy(p, PQgetvalue(res, i, j), sizeof(int));
00448 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00449 *p = ntohl(*p);
00450 #endif
00451 cp_vector_set_element(rec, j, p);
00452 }
00453 break;
00454
00455 case CP_FIELD_TYPE_LONG:
00456 {
00457 long *p = malloc(sizeof(long));
00458 memcpy(p, PQgetvalue(res, i, j), sizeof(long));
00459 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00460 if (sizeof(long) == sizeof(int))
00461 *p = ntohl(*p);
00462 #endif
00463 cp_vector_set_element(rec, j, p);
00464 }
00465 break;
00466
00467 case CP_FIELD_TYPE_FLOAT:
00468 {
00469 float *p = calloc(1, sizeof(float));
00470 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00471 int *q = (int *) p;
00472 #endif
00473 memcpy(p, PQgetvalue(res, i, j), sizeof(float));
00474 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00475 *q = ntohl(*q);
00476 #endif
00477 cp_vector_set_element(rec, j, p);
00478 }
00479 break;
00480
00481 case CP_FIELD_TYPE_DOUBLE:
00482 {
00483 double *p = malloc(sizeof(double));
00484 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00485 int *q = (int *) p;
00486 int w;
00487 #endif
00488 memcpy(p, PQgetvalue(res, i, j), sizeof(double));
00489 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00490
00491 q[0] = ntohl(q[0]);
00492 q[1] = ntohl(q[1]);
00493 w = q[0];
00494 q[0] = q[1];
00495 q[1] = w;
00496 #endif
00497 cp_vector_set_element(rec, j, p);
00498 }
00499 break;
00500
00501 case CP_FIELD_TYPE_TIME:
00502 case CP_FIELD_TYPE_DATE:
00503 case CP_FIELD_TYPE_TIMESTAMP:
00504 {
00505 union
00506 {
00507 double p;
00508 int q[2];
00509 } up;
00510 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00511 int w;
00512 #endif
00513 cp_timestampz *t = calloc(1, sizeof(cp_timestampz));
00514 memcpy(&up.p, PQgetvalue(res, i, j), sizeof(double));
00515 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00516 up.q[0] = ntohl(up.q[0]);
00517 up.q[1] = ntohl(up.q[1]);
00518 w = up.q[0];
00519 up.q[0] = up.q[1];
00520 up.q[1] = w;
00521 #endif
00522 up.p += (30 * 365 + 7) * 24 * 3600;
00523 t->tm.tv_sec = up.p;
00524 t->tm.tv_usec = (up.p - t->tm.tv_sec) * 1000000;
00525 cp_vector_set_element(rec, j, t);
00526 }
00527 break;
00528
00529 default:
00530 cp_vector_set_element(rec, j,
00531 cp_string_create(PQgetvalue(res, i, j),
00532 PQgetlength(res, i, j)));
00533 break;
00534 }
00535 }
00536 else if (*type == CP_FIELD_TYPE_BLOB)
00537 cp_vector_set_element(rec, j,
00538 pgsql_unescape_binary(NULL, PQgetvalue(res, i, j)));
00539 else
00540 cp_vector_set_element(rec, j,
00541 cp_string_create(PQgetvalue(res, i, j),
00542 PQgetlength(res, i, j)));
00543 }
00544 }
00545
00546 cp_list_append(list, rec);
00547 }
00548
00549 if (i == result_set->row_count)
00550 {
00551 result_set->fetch_complete = 1;
00552 PQclear(res);
00553 result_set->source = NULL;
00554 }
00555
00556 return i;
00557 }
00558
00559 static cp_result_set *pgsql_select(cp_db_connection *_conn, char *query)
00560 {
00561 int rc;
00562 PGresult *res = NULL;
00563 PGconn *conn = (PGconn *) _conn->connection;
00564 cp_result_set *result_set = NULL;
00565
00566 rc = PQsendQuery(conn, query);
00567 while (PQflush(conn) == 1);
00568 if (rc == 0)
00569 {
00570 cp_error(CP_DBMS_QUERY_FAILED, "%s", query);
00571 return NULL;
00572 }
00573
00574 if ((res = PQgetResult(conn)) != NULL)
00575 {
00576 result_set = create_result_set(_conn, res);
00577 do
00578 {
00579 if (pgsql_result_error(res)) return NULL;
00580 fetch_rows(result_set, res);
00581
00582 if (!_conn->read_result_set_at_once) break;
00583 }
00584 while ((res = PQgetResult(conn)) != NULL);
00585 }
00586 else while (PQflush(conn) == 1);
00587
00588 return result_set;
00589 }
00590
00591 static int convert_field_type(int field_type)
00592 {
00593 int *type = cp_hashtable_get(pgsql_field_type_map, &field_type);
00594 return type != NULL ? *type : field_type;
00595 }
00596
00597 static int pgsql_fetch_metadata(cp_result_set *result_set)
00598 {
00599 int rc = -1;
00600 int j;
00601
00602 if (result_set->field_headers == NULL)
00603 {
00604 PGresult *res = result_set->source;
00605 result_set->field_headers =
00606 cp_vector_create_by_option(result_set->field_count,
00607 COLLECTION_MODE_NOSYNC |
00608 COLLECTION_MODE_COPY |
00609 COLLECTION_MODE_DEEP |
00610 COLLECTION_MODE_MULTIPLE_VALUES,
00611 (cp_copy_fn) strdup,
00612 (cp_destructor_fn) free);
00613 result_set->field_types =
00614 cp_vector_create_by_option(result_set->field_count,
00615 COLLECTION_MODE_NOSYNC |
00616 COLLECTION_MODE_COPY |
00617 COLLECTION_MODE_DEEP |
00618 COLLECTION_MODE_MULTIPLE_VALUES,
00619 (cp_copy_fn) intdup,
00620 (cp_destructor_fn) free);
00621 for (j = 0; j < result_set->field_count; j++)
00622 {
00623 int type = convert_field_type(PQftype(result_set->source, j));
00624 cp_vector_set_element(result_set->field_headers, j,
00625 PQfname(res, j));
00626 cp_vector_set_element(result_set->field_types, j, &type);
00627 }
00628 rc = 0;
00629 }
00630
00631 return rc;
00632 }
00633
00634 static int pgsql_fetch_next(cp_result_set *result_set)
00635 {
00636 int i, j;
00637 PGresult *res = result_set->source;
00638 PGconn *conn = (PGconn *) result_set->connection->connection;
00639 cp_vector *rec;
00640
00641 while (res)
00642 {
00643 if (pgsql_result_error(res)) return -1;
00644
00645 for (i = result_set->position; i < result_set->row_count; i++)
00646 {
00647 if (((i + 1) % result_set->connection->fetch_size) == 0) break;
00648
00649 rec = cp_vector_create_by_option(result_set->field_count,
00650 COLLECTION_MODE_DEEP,
00651 NULL, (cp_destructor_fn) cp_string_destroy);
00652
00653 for (j = 0; j < result_set->field_count; j++)
00654 {
00655 if (!PQgetisnull(res, i, j))
00656 cp_vector_set_element(rec, j,
00657 cp_string_create(PQgetvalue(res, i, j),
00658 PQgetlength(res, i, j)));
00659 }
00660
00661 cp_list_append(result_set->results, rec);
00662 }
00663
00664 if (i == result_set->row_count)
00665 {
00666 result_set->fetch_complete = 1;
00667 PQclear(res);
00668 result_set->source = NULL;
00669 }
00670 else
00671 break;
00672
00673 res = PQgetResult(conn);
00674 }
00675
00676 return 0;
00677 }
00678
00679 static int pgsql_release_result_set(cp_result_set *result_set)
00680 {
00681 if (result_set->source)
00682 {
00683 PGresult *res = result_set->source;
00684 PGconn *conn = (PGconn *) result_set->connection->connection;
00685 do
00686 {
00687 PQclear(res);
00688 } while ((res = PQgetResult(conn)) != NULL);
00689 }
00690
00691 return 0;
00692 }
00693
00694 static int pgsql_update(cp_db_connection *_conn, char *query)
00695 {
00696 int rows = 0;
00697 PGresult *res = NULL;
00698 PGconn *conn = (PGconn*) _conn->connection;
00699 int rc;
00700
00701 rc = PQsendQuery(conn, query);
00702 while (PQflush(conn) == 1);
00703 if (rc == 0)
00704 {
00705 cp_error(CP_DBMS_QUERY_FAILED, "%s", query);
00706 return -1;
00707 }
00708
00709 while ((res = PQgetResult(conn)) != NULL)
00710 {
00711 switch (PQresultStatus(res))
00712 {
00713 case PGRES_BAD_RESPONSE:
00714 case PGRES_NONFATAL_ERROR:
00715 case PGRES_FATAL_ERROR:
00716 cp_error(CP_DBMS_QUERY_FAILED, "postgres: %s", query);
00717 cp_error(CP_DBMS_QUERY_FAILED,
00718 "postgres: %s", PQresultErrorMessage(res));
00719 PQclear(res);
00720 return -1;
00721
00722 default:
00723 rows += atoi(PQcmdTuples(res));
00724 break;
00725 }
00726 PQclear(res);
00727 }
00728
00729 return rows;
00730 }
00731
00732 static char *pgsql_escape_string(cp_db_connection *conn, char *src, int len)
00733 {
00734 char *res = malloc(len * 2 + 1);
00735 PQescapeString(res, src, len);
00736 return res;
00737 }
00738
00739 static char *pgsql_escape_binary(cp_db_connection *conn, char *src,
00740 int len, int *res_len)
00741 {
00742 return PQescapeBytea(src, len, res_len);
00743 }
00744
00745
00746
00747 static Oid cprops2pgsql(cp_field_type type)
00748 {
00749 switch(type)
00750 {
00751 case CP_FIELD_TYPE_BOOLEAN: return 16;
00752 case CP_FIELD_TYPE_CHAR: return 1043;
00753 case CP_FIELD_TYPE_SHORT:return 21;
00754 case CP_FIELD_TYPE_INT: return 23;
00755 case CP_FIELD_TYPE_LONG: return sizeof(long) == sizeof(int) ? 23 : 20;
00756 case CP_FIELD_TYPE_LONG_LONG: return 20;
00757 case CP_FIELD_TYPE_FLOAT: return 700;
00758 case CP_FIELD_TYPE_DOUBLE: return 701;
00759 case CP_FIELD_TYPE_VARCHAR: return 1043;
00760 case CP_FIELD_TYPE_BLOB: return 17;
00761 case CP_FIELD_TYPE_DATE: return 1114;
00762 case CP_FIELD_TYPE_TIME: return 1083;
00763 case CP_FIELD_TYPE_TIMESTAMP: return 1114;
00764 default: return 2276;
00765 }
00766
00767 return 2276;
00768 }
00769
00770 char *format_prm(char *query, int prm_count)
00771 {
00772 char *subst = query;
00773 char marker[5];
00774 int count = 1;
00775
00776 if (prm_count)
00777 {
00778 cp_string *buf = NULL;
00779 char *curr = query;
00780 char *prev = NULL;
00781 while ((curr = strchr(curr, '?')) != NULL)
00782 {
00783 if (buf == NULL)
00784 {
00785 buf = cp_string_create("", 0);
00786 prev = query;
00787 }
00788
00789 cp_string_cat_bin(buf, prev, curr - prev);
00790 #ifdef CP_HAS_SNPRINTF
00791 snprintf(marker, 5, "$%d", count);
00792 #else
00793 sprintf(marker, "$%d", count);
00794 #endif
00795 count++;
00796 cp_string_cstrcat(buf, marker);
00797 prev = ++curr;
00798 }
00799 if (prev) cp_string_cstrcat(buf, prev);
00800 if (buf)
00801 {
00802 subst = buf->data;
00803 free(buf);
00804 }
00805 }
00806
00807 return subst;
00808 }
00809
00810 static cp_db_statement *pgsql_prepare_statement(cp_db_connection *conn,
00811 int prm_count,
00812 cp_field_type *prm_type,
00813 char *query)
00814 {
00815 cp_db_statement *statement = NULL;
00816 int i;
00817 PGresult *res;
00818 Oid *paramTypes = malloc(prm_count * sizeof(Oid));
00819 char *stmt = malloc(21);
00820 char *subst;
00821 ExecStatusType rc;
00822
00823 #ifdef CP_HAS_STRLCPY
00824 strlcpy(stmt, "STMT", 21);
00825 #else
00826 strcpy(stmt, "STMT");
00827 #endif
00828 gen_id_str(&stmt[4]);
00829
00830 for (i = 0; i < prm_count; i++)
00831 paramTypes[i] = cprops2pgsql(prm_type[i]);
00832
00833
00834 subst = format_prm(query, prm_count);
00835 res = PQprepare(conn->connection, stmt, subst, prm_count, paramTypes);
00836 free(paramTypes);
00837 if (subst != query) free(subst);
00838
00839 if (res == NULL)
00840 {
00841 cp_error(CP_DBMS_CLIENT_ERROR, "%s - prepare statement: %s",
00842 conn->data_source->act->dbms_lit,
00843 PQerrorMessage(conn->connection));
00844 free(stmt);
00845 }
00846 else
00847 {
00848 rc = PQresultStatus(res);
00849 switch (rc)
00850 {
00851 case PGRES_EMPTY_QUERY:
00852 case PGRES_BAD_RESPONSE:
00853 case PGRES_FATAL_ERROR:
00854 free(stmt);
00855 stmt = NULL;
00856 cp_error(CP_DBMS_CLIENT_ERROR, "%s - prepare statement: %s",
00857 conn->data_source->act->dbms_lit,
00858 PQresultErrorMessage(res));
00859 break;
00860
00861 case PGRES_NONFATAL_ERROR:
00862 cp_warn("%s - prepare statement: %s",
00863 conn->data_source->act->dbms_lit,
00864 PQresultErrorMessage(res));
00865 default:
00866 statement = cp_db_statement_instantiate(conn, prm_count,
00867 prm_type, stmt);
00868 if (statement == NULL)
00869 {
00870
00871 free(stmt);
00872 }
00873 break;
00874
00875 }
00876 PQclear(res);
00877 }
00878
00879 if (statement)
00880 statement->store = memdup(prm_type, prm_count * sizeof(cp_field_type));
00881
00882 return statement;
00883 }
00884
00885 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
00886
00887 void hton8byte(void *ptr)
00888 {
00889 int w;
00890 union
00891 {
00892 double d;
00893 int i[2];
00894 } *u;
00895
00896 u = ptr;
00897 w = ntohl(u->i[0]);
00898 u->i[0] = ntohl(u->i[1]);
00899 u->i[1] = w;
00900 }
00901
00902
00903 static void **to_network_order(cp_field_type *types, void **prm, int count)
00904 {
00905 int i;
00906 void **res = calloc(count, sizeof(void *));
00907 short *pshort;
00908 int *pint;
00909 long *plong;
00910 float *pfloat;
00911 double *pdouble;
00912
00913 for (i = 0; i < count; i++)
00914 {
00915 switch (types[i])
00916 {
00917 case CP_FIELD_TYPE_SHORT:
00918 pshort = malloc(sizeof(short));
00919 memcpy(pshort, prm[i], sizeof(short));
00920 *pshort = htons(*pshort);
00921 res[i] = pshort;
00922 break;
00923
00924 case CP_FIELD_TYPE_INT:
00925 pint = malloc(sizeof(int));
00926 memcpy(pint, prm[i], sizeof(int));
00927 *pint = htonl(*pint);
00928 res[i] = pint;
00929 break;
00930
00931 case CP_FIELD_TYPE_LONG:
00932 if (sizeof(long) == sizeof(int))
00933 {
00934 plong = malloc(sizeof(long));
00935 memcpy(plong, prm[i], sizeof(long));
00936 *plong = htonl(*plong);
00937 res[i] = plong;
00938 break;
00939 }
00940
00941 case CP_FIELD_TYPE_FLOAT:
00942 pfloat = malloc(sizeof(float));
00943 memcpy(pfloat, prm[i], sizeof(float));
00944 plong = (long *) pfloat;
00945 *plong = htonl(*plong);
00946 res[i] = pfloat;
00947 break;
00948
00949 case CP_FIELD_TYPE_DOUBLE:
00950 pdouble = malloc(sizeof(double));
00951 memcpy(pdouble, prm[i], sizeof(double));
00952 hton8byte(pdouble);
00953 res[i] = pdouble;
00954 break;
00955
00956 case CP_FIELD_TYPE_TIME:
00957 case CP_FIELD_TYPE_DATE:
00958 case CP_FIELD_TYPE_TIMESTAMP:
00959 res[i] = prm[i];
00960 hton8byte(res[i]);
00961 break;
00962
00963 default:
00964 res[i] = prm[i];
00965 }
00966 }
00967
00968 return res;
00969 }
00970
00971 void delete_prm(void **in_order_prm, cp_field_type *types, int count)
00972 {
00973 int i;
00974
00975 for (i = 0; i < count; i++)
00976 {
00977 switch (types[i])
00978 {
00979 case CP_FIELD_TYPE_SHORT:
00980 case CP_FIELD_TYPE_INT:
00981 case CP_FIELD_TYPE_FLOAT:
00982 case CP_FIELD_TYPE_DOUBLE:
00983 free(in_order_prm[i]);
00984 break;
00985
00986 case CP_FIELD_TYPE_LONG:
00987 if (sizeof(long) == sizeof(int))
00988 {
00989 free(in_order_prm[i]);
00990 break;
00991 }
00992
00993 default:
00994 break;
00995 }
00996 }
00997
00998 free(in_order_prm);
00999 }
01000 #endif
01001
01002 static void *to_pgtime(cp_timestampz *tmz)
01003 {
01004 double *d = calloc(1, sizeof(double));
01005 *d = tmz->tm.tv_sec + ((double) tmz->tm.tv_usec) / 1e6
01006 + tmz->tz_minuteswest * 60 - ((30 * 365 + 7) * 86400);
01007 return d;
01008 }
01009
01010 static void subst_cp2pgsql(int count, cp_field_type *types, void **prm, int *lengths)
01011 {
01012 int i;
01013 short *s;
01014 char *ch;
01015
01016 for (i = 0; i < count; i++)
01017 {
01018 switch (types[i])
01019 {
01020 case CP_FIELD_TYPE_BOOLEAN:
01021 s = prm[i];
01022 ch = prm[i];
01023 *ch = (prm[i] == 0 ? 'f' : 't');
01024 lengths[i] = 1;
01025 break;
01026
01027 case CP_FIELD_TYPE_TIME:
01028 case CP_FIELD_TYPE_DATE:
01029 case CP_FIELD_TYPE_TIMESTAMP:
01030 prm[i] = to_pgtime(prm[i]);
01031 lengths[i] = sizeof(double);
01032 break;
01033
01034 default:
01035 break;
01036 }
01037 }
01038 }
01039
01040 static void free_cp2pgsql(int count, cp_field_type *types, void **prm)
01041 {
01042 int i;
01043
01044 for (i = 0; i < count; i++)
01045 {
01046 if (types[i] == CP_FIELD_TYPE_TIME ||
01047 types[i] == CP_FIELD_TYPE_DATE ||
01048 types[i] == CP_FIELD_TYPE_TIMESTAMP)
01049 {
01050 free(prm[i]);
01051 }
01052 }
01053 }
01054
01055 static int pgsql_execute_statement(cp_db_connection *conn,
01056 cp_db_statement *stmt,
01057 cp_result_set **result_set,
01058 int *lengths,
01059 void **prm)
01060 {
01061 int *fmt = NULL;
01062 cp_result_set *res = NULL;
01063 PGresult *pgres = NULL;
01064 ExecStatusType ret;
01065 int i;
01066 int rc = 0;
01067 cp_field_type *types = NULL;
01068 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01069 void **in_order_prm = prm;
01070 #endif
01071
01072 subst_cp2pgsql(stmt->prm_count, stmt->store, prm, lengths);
01073
01074 if (lengths)
01075 {
01076 types = stmt->store;
01077 fmt = calloc(stmt->prm_count, sizeof(int));
01078 for (i = 0; i < stmt->prm_count; i++) fmt[i] = 1;
01079 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01080 in_order_prm = to_network_order(types, prm, stmt->prm_count);
01081 #endif
01082 }
01083
01084 pgres = PQexecPrepared(conn->connection, (char *) stmt->source,
01085 stmt->prm_count,
01086 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01087 (const char * const *) in_order_prm,
01088 #else
01089 (const char * const *) prm,
01090 #endif
01091 lengths, fmt, stmt->binary);
01092 free_cp2pgsql(stmt->prm_count, stmt->store, prm);
01093 if (fmt) free(fmt);
01094 #ifdef CP_BYTE_ORDER_LITTLE_ENDIAN
01095 if (in_order_prm != prm) delete_prm(in_order_prm, types, stmt->prm_count);
01096 #endif
01097
01098 ret = PQresultStatus(pgres);
01099 switch (ret)
01100 {
01101 case PGRES_TUPLES_OK:
01102 res = create_result_set(conn, pgres);
01103 res->binary = stmt->binary;
01104 fetch_rows(res, pgres);
01105 break;
01106
01107 case PGRES_COMMAND_OK:
01108 rc = atoi(PQcmdTuples(pgres));
01109 PQclear(pgres);
01110 break;
01111
01112 case PGRES_EMPTY_QUERY:
01113 case PGRES_BAD_RESPONSE:
01114 case PGRES_FATAL_ERROR:
01115 case PGRES_NONFATAL_ERROR:
01116 default:
01117 rc = -1;
01118 PQclear(pgres);
01119 cp_error(CP_DBMS_QUERY_FAILED, "%s - prepared statement: %s",
01120 conn->data_source->act->dbms_lit,
01121 PQresultErrorMessage(pgres));
01122 break;
01123 }
01124
01125 if (result_set)
01126 *result_set = res;
01127
01128 return rc;
01129 }
01130
01131 static void pgsql_release_statement(cp_db_statement *statement)
01132 {
01133 if (statement->source) free(statement->source);
01134 if (statement->store) free(statement->store);
01135 }
01136
01137 static void pgsql_set_autocommit(cp_db_connection *conn, int state)
01138 {
01139 if (state == 0)
01140 pgsql_update(conn, "begin;");
01141 }
01142
01143 static int pgsql_commit(cp_db_connection *conn)
01144 {
01145 int rc = pgsql_update(conn, "commit;");
01146 if (conn->autocommit == 0)
01147 pgsql_update(conn, "begin;");
01148 return rc;
01149 }
01150
01151 static int pgsql_rollback(cp_db_connection *conn)
01152 {
01153 int rc = pgsql_update(conn, "rollback;");
01154 if (conn->autocommit == 0)
01155 pgsql_update(conn, "begin;");
01156 return rc;
01157 }
01158
01159 #ifdef WINDOWS
01160 BOOL APIENTRY DllMain( HANDLE hModule,
01161 DWORD ul_reason_for_call,
01162 LPVOID lpReserved
01163 )
01164 {
01165 switch (ul_reason_for_call)
01166 {
01167 case DLL_PROCESS_ATTACH:
01168 case DLL_THREAD_ATTACH:
01169 case DLL_THREAD_DETACH:
01170 case DLL_PROCESS_DETACH:
01171 break;
01172 }
01173 return TRUE;
01174 }
01175 #endif