1
|
|
2
|
|
3
|
|
4
|
|
5
|
|
6
|
#include <iostream>
|
7
|
#include <stdio.h>
|
8
|
#include <string.h>
|
9
|
#include <getopt.h>
|
10
|
#include <stdlib.h>
|
11
|
#include <errno.h>
|
12
|
#include <pthread.h>
|
13
|
#include <rados/librados.h>
|
14
|
|
15
|
using namespace std;
|
16
|
|
17
|
void print_usage()
|
18
|
{
|
19
|
cout << endl;
|
20
|
cout << "Usage: " << endl;
|
21
|
cout << " test_rados -n num -p pool -i file" << endl;
|
22
|
cout << endl;
|
23
|
cout << " where" << endl;
|
24
|
cout << " num Number of client threads" << endl;
|
25
|
cout << " pool Pool name to write to" << endl;
|
26
|
cout << " file Input file to write to pool" << endl;
|
27
|
}
|
28
|
|
29
|
void * worker(void * arg);
|
30
|
|
31
|
typedef struct
|
32
|
{
|
33
|
int id;
|
34
|
char * filename;
|
35
|
char * pool;
|
36
|
} param_t;
|
37
|
|
38
|
const char * conf_file = "./etc/ceph.conf";
|
39
|
|
40
|
int main(int argc, char *argv[])
|
41
|
{
|
42
|
if (argc < 4)
|
43
|
{
|
44
|
print_usage();
|
45
|
return -1;
|
46
|
}
|
47
|
|
48
|
int num_threads = 0;
|
49
|
char * pool_name;
|
50
|
char * filename;
|
51
|
|
52
|
int c;
|
53
|
opterr = 0;
|
54
|
while ((c = getopt (argc, argv, "n:p:i:")) != -1)
|
55
|
{
|
56
|
switch(c)
|
57
|
{
|
58
|
case 'n':
|
59
|
num_threads = atoi(optarg);
|
60
|
break;
|
61
|
case 'p':
|
62
|
pool_name = optarg;
|
63
|
break;
|
64
|
case 'i':
|
65
|
filename = optarg;
|
66
|
break;
|
67
|
case '?':
|
68
|
print_usage();
|
69
|
return 1;
|
70
|
default:
|
71
|
if (isprint (optopt))
|
72
|
cout << "Unknown option `-%c'." << optopt << endl;
|
73
|
else
|
74
|
cout << "Unknown option character `\\x%x'." << optopt << endl;
|
75
|
break;
|
76
|
}
|
77
|
}
|
78
|
|
79
|
pthread_t *threads;
|
80
|
pthread_attr_t pthread_custom_attr;
|
81
|
param_t * parms;
|
82
|
|
83
|
threads = (pthread_t *) malloc(num_threads * sizeof(pthread_t));
|
84
|
pthread_attr_init(&pthread_custom_attr);
|
85
|
parms = (param_t *)malloc(sizeof(param_t) * num_threads);
|
86
|
|
87
|
for (int i = 0; i < num_threads; ++i)
|
88
|
{
|
89
|
parms[i].id = i;
|
90
|
parms[i].filename = filename;
|
91
|
parms[i].pool = pool_name;
|
92
|
pthread_create(&threads[i], &pthread_custom_attr, worker, (void *)(parms+i));
|
93
|
}
|
94
|
|
95
|
for (int i = 0; i < num_threads; ++i)
|
96
|
{
|
97
|
pthread_join(threads[i], NULL);
|
98
|
}
|
99
|
|
100
|
free(parms);
|
101
|
return 0;
|
102
|
}
|
103
|
|
104
|
|
105
|
void * worker(void * arg)
|
106
|
{
|
107
|
param_t * param = (param_t *) arg;
|
108
|
rados_t cluster;
|
109
|
cout << "Thread " << param->id << ": creating clusting handle" << endl;
|
110
|
int err = rados_create(&cluster, NULL);
|
111
|
if (err < 0)
|
112
|
{
|
113
|
cout << "Thread " << param->id << ": Cannot create a cluster handle: " << strerror(-err) << endl;
|
114
|
return NULL;
|
115
|
}
|
116
|
cout << "Thread " << param->id << ": reading conf file" << endl;
|
117
|
err = rados_conf_read_file(cluster, conf_file);
|
118
|
if (err < 0)
|
119
|
{
|
120
|
cout << "Thread " << param->id << ": Cannot read config file:" << strerror(-err) << endl;
|
121
|
return NULL;
|
122
|
}
|
123
|
cout << "Thread " << param->id << ": connecting to cluster" << endl;
|
124
|
err = rados_connect(cluster);
|
125
|
if (err < 0)
|
126
|
{
|
127
|
cout << "Thread " << param->id << ": Cannot connect to cluster: " << strerror(-err) << endl;
|
128
|
return NULL;
|
129
|
}
|
130
|
cout << "Thread " << param->id << ": opening rados pool" << endl;
|
131
|
rados_ioctx_t io;
|
132
|
err = rados_ioctx_create(cluster, param->pool, &io);
|
133
|
if (err < 0)
|
134
|
{
|
135
|
cout << "Thread " << param->id << ": Cannot open rados pool: " << strerror(-err) << endl;
|
136
|
rados_shutdown(cluster);
|
137
|
return NULL;
|
138
|
}
|
139
|
|
140
|
char buf[1024];
|
141
|
size_t len_read, len_write;
|
142
|
size_t total = 0;
|
143
|
FILE * fr = fopen(param->filename, "rb");
|
144
|
if (!fr)
|
145
|
{
|
146
|
cout << "Thread " << param->id << ": Error opening for read: " << param->filename << endl;
|
147
|
rados_shutdown(cluster);
|
148
|
return NULL;
|
149
|
}
|
150
|
|
151
|
char oid[64];
|
152
|
memset(oid, 0, 64);
|
153
|
sprintf(oid, "%d", param->id);
|
154
|
while (!feof(fr) && !ferror(fr))
|
155
|
{
|
156
|
memset(buf, 0, 1024);
|
157
|
len_read = fread(buf, 1, 1024, fr);
|
158
|
len_write = rados_write(io, oid, buf, len_read, total);
|
159
|
if (len_read != len_write)
|
160
|
{
|
161
|
cout << "Thread " << param->id << ": Error writing: read=" << len_read << ", write=" << len_write << endl;
|
162
|
break;
|
163
|
}
|
164
|
total += len_write;
|
165
|
cout << "Thread " << param->id << ": Reading and writing: " << total << endl;
|
166
|
}
|
167
|
|
168
|
fclose(fr);
|
169
|
|
170
|
cout << "Thread " << param->id << ": Destroying IO context" << endl;
|
171
|
rados_ioctx_destroy(io);
|
172
|
cout << "Thread " << param->id << ": Disconnect from cluster" << endl;
|
173
|
rados_shutdown(cluster);
|
174
|
|
175
|
return NULL;
|
176
|
}
|