Project

General

Profile

main.cpp

Xiaopong Tran, 06/08/2012 03:15 AM

Download (4.53 KB)

 
1
/*
2
 * Copyright (C) 2012, xp@renzhi.ca
3
 * All rights reserved.
4
 */
5

    
6
#include <iostream>
7
#include <stdio.h>
8
#include <string.h>
9
#include <getopt.h>
10
#include <stdlib.h>
11
#include <errno.h>
12
#include <pthread.h>
13
#include <rados/librados.h>
14

    
15
using namespace std;
16

    
17
void print_usage()
18
{
19
    cout << endl;
20
    cout << "Usage: " << endl;
21
    cout << "  test_rados -n num -p pool -i file" << endl;
22
    cout << endl;
23
    cout << "  where" << endl;
24
    cout << "    num              Number of client threads" << endl;
25
    cout << "    pool             Pool name to write to" << endl;
26
    cout << "    file             Input file to write to pool" << endl;
27
}
28

    
29
void * worker(void * arg);
30

    
31
typedef struct
32
{
33
    int id;
34
    char * filename;
35
    char * pool;
36
} param_t;
37

    
38
const char * conf_file = "./etc/ceph.conf";
39

    
40
int main(int argc, char *argv[])
41
{
42
    if (argc < 4)
43
    {
44
        print_usage();
45
        return -1;
46
    }
47

    
48
    int num_threads = 0;
49
    char * pool_name;
50
    char * filename;
51

    
52
    int c;
53
    opterr = 0;
54
    while ((c = getopt (argc, argv, "n:p:i:")) != -1) 
55
    {
56
        switch(c)
57
        {
58
        case 'n':
59
            num_threads = atoi(optarg);
60
            break;
61
        case 'p':
62
            pool_name = optarg;
63
            break;
64
        case 'i':
65
            filename = optarg;
66
            break;
67
        case '?':
68
            print_usage();
69
            return 1;
70
        default:
71
            if (isprint (optopt))
72
                cout << "Unknown option `-%c'." <<  optopt << endl;
73
            else 
74
                cout << "Unknown option character `\\x%x'." << optopt << endl;
75
            break;
76
        }
77
    }
78

    
79
    pthread_t *threads;
80
    pthread_attr_t  pthread_custom_attr;
81
    param_t * parms;
82

    
83
    threads = (pthread_t *) malloc(num_threads * sizeof(pthread_t));
84
    pthread_attr_init(&pthread_custom_attr);
85
    parms = (param_t *)malloc(sizeof(param_t) * num_threads);
86

    
87
    for (int i = 0; i < num_threads; ++i) 
88
    {
89
        parms[i].id = i;
90
        parms[i].filename = filename;
91
        parms[i].pool = pool_name;
92
        pthread_create(&threads[i], &pthread_custom_attr, worker, (void *)(parms+i));
93
    }
94

    
95
    for (int i = 0; i < num_threads; ++i) 
96
    {
97
        pthread_join(threads[i], NULL);
98
    }
99

    
100
    free(parms);
101
    return 0;
102
}
103

    
104

    
105
void * worker(void * arg)
106
{
107
    param_t * param = (param_t *) arg;
108
    rados_t cluster;
109
    cout << "Thread " << param->id << ": creating clusting handle" << endl;
110
    int err = rados_create(&cluster, NULL);
111
    if (err < 0) 
112
    {
113
        cout << "Thread " << param->id << ": Cannot create a cluster handle: " << strerror(-err) << endl;
114
        return NULL;
115
    }
116
    cout << "Thread " << param->id << ": reading conf file" << endl;
117
    err = rados_conf_read_file(cluster, conf_file);
118
    if (err < 0) 
119
    {
120
        cout << "Thread " << param->id << ": Cannot read config file:" << strerror(-err) << endl;
121
        return NULL;
122
    }
123
    cout << "Thread " << param->id << ": connecting to cluster" << endl;
124
    err = rados_connect(cluster);
125
    if (err < 0) 
126
    {
127
        cout << "Thread " << param->id << ": Cannot connect to cluster: " << strerror(-err) << endl;
128
        return NULL;
129
    }
130
    cout << "Thread " << param->id << ": opening rados pool" << endl;
131
    rados_ioctx_t io;
132
    err = rados_ioctx_create(cluster, param->pool, &io);
133
    if (err < 0) 
134
    {
135
        cout << "Thread " << param->id << ": Cannot open rados pool: " << strerror(-err) << endl;
136
        rados_shutdown(cluster);
137
        return NULL;
138
    }
139
    
140
    char buf[1024];
141
    size_t len_read, len_write;
142
    size_t total = 0;
143
    FILE * fr = fopen(param->filename, "rb");
144
    if (!fr)
145
    {
146
        cout << "Thread " << param->id << ": Error opening for read: " << param->filename << endl;
147
        rados_shutdown(cluster);
148
        return NULL;
149
    }
150
    // Use the thread id as the oid
151
    char oid[64];
152
    memset(oid, 0, 64);
153
    sprintf(oid, "%d", param->id);
154
    while (!feof(fr) && !ferror(fr))
155
    {
156
        memset(buf, 0, 1024);
157
        len_read = fread(buf, 1, 1024, fr);
158
        len_write = rados_write(io, oid, buf, len_read, total);
159
        if (len_read != len_write)
160
        {
161
            cout << "Thread " << param->id << ": Error writing: read=" << len_read << ", write=" << len_write << endl;
162
            break;
163
        }
164
        total += len_write;
165
        cout << "Thread " << param->id << ": Reading and writing: " << total << endl;
166
    }
167

    
168
    fclose(fr);
169

    
170
    cout << "Thread " << param->id << ": Destroying IO context" << endl;
171
    rados_ioctx_destroy(io);
172
    cout << "Thread " << param->id << ": Disconnect from cluster" << endl;
173
    rados_shutdown(cluster);
174

    
175
    return NULL;
176
}