35#include "XrdVersion.hh"
43 File(std::unique_ptr<XrdOssDF> wrapDF, XrdThrottleManager &throttle, XrdSysError *lP, XrdOucTrace *tP)
44 : XrdOssWrapDF(*wrapDF), m_log(lP), m_throttle(throttle), m_trace(tP), m_wrapped(std::move(wrapDF)) {}
48virtual int Open(
const char *path,
int Oflag, mode_t
Mode,
49 XrdOucEnv &env)
override {
51 std::tie(m_user, m_uid) = m_throttle.GetUserInfo(env.
secEnv());
53 std::string open_error_message;
54 if (!m_throttle.OpenFile(m_user, open_error_message)) {
59 auto rval = wrapDF.Open(path, Oflag,
Mode, env);
62 m_throttle.CloseFile(m_user);
68virtual int Close(
long long *retsz)
override {
69 m_throttle.CloseFile(m_user);
70 return wrapDF.Close(retsz);
73virtual int getFD()
override {
return -1;}
75virtual off_t getMmap(
void **addr)
override {*addr = 0;
return 0;}
77virtual ssize_t pgRead (
void* buffer, off_t offset,
size_t rdlen,
78 uint32_t* csvec, uint64_t
opts)
override {
80 return DoThrottle(rdlen, 1,
81 static_cast<ssize_t (XrdOssDF::*)(
void*, off_t,
size_t, uint32_t*, uint64_t)
>(&
XrdOssDF::pgRead),
82 buffer, offset, rdlen, csvec,
opts);
85virtual int pgRead(XrdSfsAio *aioparm, uint64_t
opts)
override
95virtual ssize_t pgWrite(
void* buffer, off_t offset,
size_t wrlen,
96 uint32_t* csvec, uint64_t
opts)
override {
98 return DoThrottle(wrlen, 1,
99 static_cast<ssize_t (XrdOssDF::*)(
void*, off_t,
size_t, uint32_t*, uint64_t)
>(&
XrdOssDF::pgWrite),
100 buffer, offset, wrlen, csvec,
opts);
103virtual int pgWrite(XrdSfsAio *aioparm, uint64_t
opts)
override
113virtual ssize_t
Read(off_t offset,
size_t size)
override {
114 return DoThrottle(size, 1,
115 static_cast<ssize_t (XrdOssDF::*)(off_t,
size_t)
>(&
XrdOssDF::Read),
118virtual ssize_t
Read(
void* buffer, off_t offset,
size_t size)
override {
119 return DoThrottle(size, 1,
120 static_cast<ssize_t (XrdOssDF::*)(
void*, off_t,
size_t)
>(&
XrdOssDF::Read),
121 buffer, offset, size);
124virtual int Read(XrdSfsAio *aiop)
override {
132virtual ssize_t ReadV(XrdOucIOVec *readV,
int rdvcnt)
override {
134 for (
int i = 0; i < rdvcnt; ++i) {
135 sum += readV[i].
size;
141virtual ssize_t
Write(
const void* buffer, off_t offset,
size_t size)
override {
142 return DoThrottle(size, 1,
143 static_cast<ssize_t (XrdOssDF::*)(
const void*, off_t,
size_t)
>(&
XrdOssDF::Write),
144 buffer, offset, size);
147virtual int Write(XrdSfsAio *aiop)
override {
157 template <
class Fn,
class... Args>
158 int DoThrottle(
size_t rdlen,
size_t ops, Fn &&fn, Args &&... args) {
159 m_throttle.Apply(rdlen, ops, m_uid);
161 XrdThrottleTimer timer = m_throttle.StartIOTimer(m_uid, ok);
166 return std::invoke(fn, wrapDF, std::forward<Args>(args)...);
169 XrdSysError *m_log{
nullptr};
170 XrdThrottleManager &m_throttle;
171 XrdOucTrace *m_trace{
nullptr};
172 std::unique_ptr<XrdOssDF> m_wrapped;
176 static constexpr char TraceID[] =
"XrdThrottleFile";
181 FileSystem(XrdOss *oss, XrdSysLogger *log, XrdOucEnv *envP)
182 : XrdOssWrapper(*oss),
185 m_log(new XrdSysError(log)),
186 m_trace(new XrdOucTrace(m_log.get())),
187 m_throttle(m_log.get(), m_trace.get())
193 auto gstream =
reinterpret_cast<XrdXrootdGStream*
>(
envP->
GetPtr(
"Throttle.gStream*"));
194 m_log->Say(
"Config",
"Throttle g-stream has", gstream ?
"" :
" NOT",
" been configured via xrootd.mongstream directive");
195 m_throttle.SetMonitor(gstream);
199 int Configure(
const std::string &config_filename) {
200 XrdThrottle::Configuration config(*m_log, m_env);
201 if (config.Configure(config_filename)) {
202 m_log->Emsg(
"Config",
"Unable to load configuration file", config_filename.c_str());
205 m_throttle.FromConfig(config);
209 virtual ~FileSystem() {}
211 virtual XrdOssDF *newFile(
const char *user = 0)
override {
212 std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
213 return new File(std::move(wrapped), m_throttle, m_log.get(), m_trace.get());
217 XrdOucEnv *m_env{
nullptr};
218 std::unique_ptr<XrdOss> m_oss;
219 std::unique_ptr<XrdSysError> m_log{
nullptr};
220 std::unique_ptr<XrdOucTrace> m_trace{
nullptr};
221 XrdThrottleManager m_throttle;
229 const char *config_fn,
const char *parms,
231 std::unique_ptr<FileSystem> fs(
new FileSystem(curr_oss, logger, envP));
232 if (fs->Configure(config_fn)) {
233 XrdSysError(logger,
"XrdThrottle").
Say(
"Config",
"Unable to load configuration file", config_fn);
244 envP->
PutInt(
"XrdOssThrottle", 1);
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *Logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
XrdVERSIONINFO(XrdOssAddStorageSystem2, throttle)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
void PutInt(const char *varname, long value)
const XrdSecEntity * secEnv() const
void * GetPtr(const char *varname)
virtual void doneRead()=0
virtual void doneWrite()=0
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.